Source code for vlcsim.controller.controller

"""Controller class for VLC resource allocation and connection management.

This module defines the Controller class, which manages connection allocation,
assignment, and lifecycle for VLC simulations. It provides customizable allocation
algorithms and TDM frame/slice management.
"""

from __future__ import annotations
from typing import Optional, Union, Callable, Any
from enum import Enum

# Import all necessary classes from scene and connection modules
from ..scene import AccessPoint, VLed, RF, Receiver, Scenario
from .connection import Connection


[docs] class Controller: """Controller for managing VLC connections and resource allocation. The Controller class manages all connection lifecycle operations including allocation, assignment, pausing, resuming, and termination. It maintains the scenario infrastructure, tracks active connections per access point, and executes allocation algorithms. The Controller implements a flexible allocation system where custom algorithms can be plugged in to handle different resource allocation strategies. Class Attributes: status (Enum): Allocation algorithm return status - ALLOCATED: Connection successfully allocated - NOT_ALLOCATED: Connection refused (no resources) - WAIT: Connection will wait for future allocation attempt nextStatus (Enum): Internal controller status for connection state transitions - PAUSE: Connection paused, will resume later - FINISH: Connection completed transmission - RESUME: Connection resuming after pause - IDLE: Access Point has no connections - RND_WAIT: Connection waiting random time before retry Example: Basic controller setup:: # Create controller controller = Controller(x=10.0, y=10.0, z=3.0, nGrids=20, rho=0.8) # Add access points to scenario vled = VLed(x=5.0, y=5.0, z=3.0, nLedsX=2, nLedsY=2, ledPower=20, theta=60) controller.scenario.addVLed(vled) # Set allocation algorithm controller.allocator = Controller.default_alloc # Initialize internal structures controller.init() # Create and assign connection receiver = Receiver(x=3.0, y=3.0, z=0.85, aDet=1e-4, ts=0.1, index=1.5, fov=60.0) connection = Connection(id=0, receiver=receiver, time=0.0) connection.capacityRequired = 1e6 # 1 Mbps status, next_time, conn = controller.assignConnection(connection, 0.0) Note: - Must call `init()` before assigning any connections - Allocator function must be set before calling `assignConnection()` """ MAX_ACTIVE_CONNECTIONS_PER_VLED = 5 """Maximum number of active connections allowed per VLed.""" MAX_ACTIVE_CONNECTIONS_PER_RF = 12 """Maximum number of active connections allowed per RF access point.""" status = Enum("status", "ALLOCATED NOT_ALLOCATED WAIT") """Allocation algorithm return status. Indicates the result of an allocation attempt: - ALLOCATED: Connection successfully allocated to an AP - NOT_ALLOCATED: Connection refused (insufficient resources) - WAIT: Connection will wait and retry allocation later """ nextStatus = Enum("nextStatus", "PAUSE FINISH RESUME IDLE RND_WAIT") """Internal controller status for connection state transitions. Used within allocation routines to indicate next action: - PAUSE: Connection paused, will resume transmission later - FINISH: Connection completed its transmission goal - RESUME: Connection resuming transmission after pause - IDLE: Access Point has no active connections - RND_WAIT: Connection waiting random time before retry """
[docs] def __init__(self, x: float, y: float, z: float, nGrids: int, rho: float) -> None: """Initialize the Controller with scenario dimensions. Creates a Controller instance managing a rectangular room scenario with specified dimensions and discretization parameters. Args: x: Room length in meters (X-axis) y: Room width in meters (Y-axis) z: Room height in meters (Z-axis) nGrids: Number of grid divisions per meter for wall reflection calculations rho: Wall reflection coefficient (0.0 to 1.0) Example:: # Create controller for 10x10x3m room controller = Controller(x=10.0, y=10.0, z=3.0, nGrids=20, rho=0.8) """ self.__scenario: Scenario = Scenario(x, y, z, nGrids, rho) self.__allocator: Optional[ Callable[ [Receiver, Connection, Scenario, "Controller"], tuple[int, Connection] ] ] = None self.__allocationStatus: Optional[int] = None self.__activeConnections: list[list[list[Union[bool, Connection]]]] = [] self.__numberActiveConnections: list[int] = []
# self.__activeConnections = [[]] * len(self.__scenario.vleds) @property def scenario(self) -> Scenario: """Get the scenario managed by this controller. Returns: Scenario: The simulation scenario containing all access points and room configuration """ return self.__scenario @property def allocationStatus(self): """Get the status of the last allocation attempt. Returns: status: One of Controller.status enum values (ALLOCATED, NOT_ALLOCATED, or WAIT) """ return self.__allocationStatus @property def allocator(self): """Get the allocation algorithm function. The allocator is a callable that determines how connections are assigned to access points. It must follow this signature: .. code-block:: python def alloc_function( receiver: Receiver, connection: Connection, scenario: Scenario, controller: Controller ) -> tuple[int, Connection]: # Allocation logic here return Controller.status.ALLOCATED, connection Returns: Callable or None: The current allocation function, or None if not set Example: Custom allocation algorithm:: def my_alloc(receiver, connection, scenario, controller): # Find best VLed by SNR vleds = scenario.vleds snrs = [scenario.snrVled(receiver, v) for v in vleds] best_idx = snrs.index(max(snrs)) # Check if VLed has capacity if controller.numberOfActiveConnections(vleds[best_idx]) < 5: connection.AP = vleds[best_idx] return Controller.status.ALLOCATED, connection else: return Controller.status.WAIT, connection controller.allocator = my_alloc """ return self.__allocator @allocator.setter def allocator( self, allocator: Callable[ [Receiver, Connection, Scenario, "Controller"], tuple[int, Connection] ], ): """Set the allocation algorithm. Args: allocator: Function implementing the allocation logic """ self.__allocator = allocator
[docs] def assignConnection(self, connection: Connection, time: float): """Assign a connection to an access point using the allocation algorithm. Executes the allocation algorithm to determine if and where a connection should be assigned. If successful, allocates TDM frame/slice resources and schedules transmission times. Args: connection: Connection object to assign time: Current simulation time in seconds Returns: tuple: (nextStatus, next_time, connection) - nextStatus: One of Controller.nextStatus enum values - next_time: Next event time for this connection - connection: Modified connection object (or None if not allocated) Raises: ValueError: If allocator not set, AP invalid, or required fields missing Exception: If trying to assign a slice in the past Example:: controller.allocator = Controller.default_alloc status, next_time, conn = controller.assignConnection(connection, 0.0) if status == Controller.nextStatus.RESUME: print(f"Connection allocated, resume at {next_time}") elif status == Controller.nextStatus.RND_WAIT: print("Connection waiting, will retry") """ if self.__allocator is None: raise ValueError( "Allocator function must be set before assigning connections." ) self.__allocationStatus, connection = self.__allocator( connection.receiver, connection, self.__scenario, self ) connection.receiver.timeFirstConnected = time if self.__allocationStatus == Controller.status.ALLOCATED: if connection.AP is None: raise ValueError( "Connection must have a valid AccessPoint (AP) after allocation." ) index = self.APPosition(connection.AP) self.__numberActiveConnections[index] += 1 actualSlice = connection.nextSliceInAPWhenArriving(connection.AP) - 1 actualTime = ( (time // (connection.AP.slicesInFrame * connection.AP.sliceTime)) * connection.AP.slicesInFrame * connection.AP.sliceTime ) auxPreviousTime = 0 if actualSlice == -1: auxPreviousTime = 1 if ( connection.receiver.capacityFromAP is None or connection.AP.sliceTime is None ): raise ValueError( "Receiver must have capacityFromAP and AP must have sliceTime set." ) if connection.capacityRequired is None: raise ValueError("Connection must have capacityRequired set.") connection.receiver.goalTime = ( connection.capacityRequired / connection.receiver.capacityFromAP * connection.AP.sliceTime ) connection.goalTime = connection.receiver.goalTime for fs in connection.frameSlice: if fs[0] <= 0 and fs[1] <= actualSlice: raise Exception("You are trying to assign a slice in the past...") self.assignSlice(index, fs[0], fs[1], connection) connection.insertTime( actualTime + connection.AP.sliceTime * connection.AP.slicesInFrame * (fs[0] + auxPreviousTime) + fs[1] * connection.AP.sliceTime ) time = connection.getNextTime() return Controller.nextStatus.RESUME, time, connection elif self.__allocationStatus == Controller.status.NOT_ALLOCATED: return Controller.nextStatus.IDLE, time, connection elif self.__allocationStatus == Controller.status.WAIT: return Controller.nextStatus.RND_WAIT, time, connection else: raise Exception("Return status of allocation algorithm not supported")
[docs] def pauseConnection(self, connection: Connection, time: float): """Pause an active connection temporarily. Suspends a connection's transmission, updating its active time and scheduling the next resume time. Manages frame cleanup if no more slices remain in current frame. Args: connection: Connection to pause time: Current simulation time in seconds Returns: tuple: (Controller.nextStatus.RESUME, next_time, connection) - Status is always RESUME - next_time: Time when connection will resume - connection: The connection object Raises: ValueError: If connection has no assigned AP Note: Updates receiver.timeActive with the slice duration. """ if connection.AP is None: raise ValueError("Connection must have a valid AccessPoint (AP).") receiver = connection.receiver index = self.APPosition(connection.AP) receiver.timeActive += connection.AP.sliceTime timeNext = connection.getNextTime() nextSlice = connection.nextSliceInAPWhenArriving(connection.AP) flag = False for i in range(nextSlice, connection.AP.slicesInFrame): if len(self.__activeConnections[index]) == 0: flag = True break if self.__activeConnections[index][0][i] != False: flag = True break if not flag: self.__activeConnections[index].pop(0) return Controller.nextStatus.RESUME, timeNext, connection
[docs] def resumeConnection(self, connection: Connection, time: float): """Resume a paused connection. Checks if the connection has completed its goal time or needs another slice. Determines whether to finish or pause again based on remaining transmission time. Args: connection: Connection to resume time: Current simulation time in seconds Returns: tuple: (nextStatus, next_time, connection) - nextStatus: FINISH if goal reached, PAUSE if needs more time - next_time: Time for next event (finish time or next pause) - connection: The connection object Raises: ValueError: If connection has no AP or receiver has no goalTime Note: Compares receiver.goalTime with receiver.timeActive + sliceTime. """ if connection.AP is None: raise ValueError("Connection must have a valid AccessPoint (AP).") receiver = connection.receiver if receiver.goalTime is None: raise ValueError( "Receiver must have goalTime set before resuming connection." ) if receiver.goalTime < receiver.timeActive + connection.AP.sliceTime: return ( Controller.nextStatus.FINISH, time + receiver.goalTime - receiver.timeActive, connection, ) else: return ( Controller.nextStatus.PAUSE, time + connection.AP.sliceTime, connection, )
[docs] def unassignConnection(self, connection: Connection, time: float): """Unassign and finalize a connection. Removes a connection from its AP, updates receiver timing, cleans up frame allocations, and decrements the AP's active connection count. Args: connection: Connection to unassign time: Current simulation time in seconds Returns: tuple: (Controller.nextStatus.IDLE, time, None) - Status is always IDLE - time: Current simulation time - None: No connection returned Raises: ValueError: If connection has no assigned AP Note: Sets receiver.timeFinished and ensures receiver.timeActive = goalTime. """ if connection.AP is None: raise ValueError("Connection must have a valid AccessPoint (AP).") index = self.APPosition(connection.AP) receiver = connection.receiver if receiver.goalTime is not None: receiver.timeActive = receiver.goalTime receiver.timeFinished = time nextSlice = connection.nextSliceInAPWhenArriving(connection.AP) flag = False for i in range(nextSlice, connection.AP.slicesInFrame): if len(self.__activeConnections[index]) == 0: break if self.__activeConnections[index][0][i] != False: flag = True if flag: self.__activeConnections[index].pop(self.__activeConnection[index]) self.__numberActiveConnections[index] -= 1 return Controller.nextStatus.IDLE, time, None
[docs] def init(self): """Initialize controller's internal structures for simulation. Must be invoked before starting the simulation. Initializes active connection lists and connection counters for each access point in the scenario. Performs validation on scenario and AP configuration. Raises: RuntimeError: If scenario, APs, or their configuration is invalid Note: - Creates frame/slice structure for each AP based on slicesInFrame - Sorts APs by position to ensure correct indexing - Must be called after all APs have been added to the scenario Example:: controller = Controller(10.0, 10.0, 3.0, 20, 0.8) vled = VLed(5.0, 5.0, 3.0, 2, 2, 20, 60) vled.slicesInFrame = 10 controller.scenario.addVLed(vled) controller.init() # Required before simulation """ self.__activeConnections = [] self.__numberActiveConnections = [] if not hasattr(self, "_Controller__scenario") or self.__scenario is None: raise RuntimeError("The scenario is not configured in the controller.") vleds = getattr(self.__scenario, "vleds", None) rfs = getattr(self.__scenario, "rfs", None) if vleds is None or rfs is None: raise RuntimeError("The scenario must have 'vleds' and 'rfs' lists.") # Collect all APs with their positions all_aps: list[tuple[int, Union[VLed, RF]]] = [] for idx, vled in enumerate(vleds): pos = self.__scenario.vledsPositions[idx] all_aps.append((pos, vled)) for idx, rf in enumerate(rfs): pos = self.__scenario.rfsPositions[idx] all_aps.append((pos, rf)) # Sort APs by position to ensure correct order all_aps.sort(key=lambda x: x[0]) # Initialize structures for each AP in order for pos, ap in all_aps: self.__numberActiveConnections.append(0) self.__activeConnections.append([]) if not hasattr(ap, "slicesInFrame") or ap.slicesInFrame is None: raise RuntimeError( f"The AP at position {pos} does not have 'slicesInFrame' defined." ) if not isinstance(ap.slicesInFrame, int) or ap.slicesInFrame <= 0: raise RuntimeError( f"'slicesInFrame' must be a positive integer for the AP at position {pos}." ) self.__activeConnections[-1].append([]) for _ in range(ap.slicesInFrame): self.__activeConnections[-1][-1].append(False) self.__activeConnection = [0] * len(self.__activeConnections)
[docs] def APPosition(self, ap: Union[VLed, RF, AccessPoint]) -> int: """Get the position index of an access point in the controller. Returns the internal index used to track an AP in the controller's data structures. Works for both VLed and RF access points. Args: ap: Access point (VLed or RF) Returns: int: Position index of the AP in controller structures Note: Returns -1 if AP is not found (should not occur in normal operation). """ if isinstance(ap, VLed): vleds = getattr(self.__scenario, "vleds", []) for idx, vled in enumerate(vleds): if vled is ap: return self.__scenario.vledsPositions[idx] elif isinstance(ap, RF): rfs = getattr(self.__scenario, "rfs", []) for idx, rf in enumerate(rfs): if rf is ap: return self.__scenario.rfsPositions[idx] return -1 # Should not reach here
@property def activeConnections(self): """Get the active connections data structure. Returns a nested list structure where each outer element corresponds to an AP, and contains frames, which contain slices. Each slice is either False (empty) or a Connection object. Returns: list[list[list[Union[bool, Connection]]]]: Three-level structure: - Level 1: List of APs (length = number of APs) - Level 2: List of frames per AP (grows dynamically) - Level 3: List of slices per frame (length = slicesInFrame) Example:: # Access slice 5 of frame 2 on AP 0 if controller.activeConnections[0][2][5]: connection = controller.activeConnections[0][2][5] print(f"Slice occupied by connection {connection.id}") """ return self.__activeConnections
[docs] def numberOfActiveConnections(self, ap: Union[VLed, RF, AccessPoint]) -> int: """Get the number of active connections on a specific AP. Args: ap: Access point to query (VLed or RF) Returns: int: Current number of active connections on this AP Example:: active = controller.numberOfActiveConnections(vled) if active < 5: # AP has capacity for more connections pass """ return self.__numberActiveConnections[self.APPosition(ap)]
[docs] def assignSlice(self, apIndex: int, frame: int, slice: int, connection: Connection): """Assign a connection to a specific frame and slice on an AP. Allocates a TDM slice for the given connection. Creates new frames if necessary. Args: apIndex: Index of the AP in controller structures frame: Frame number to assign slice: Slice number within the frame connection: Connection to assign Raises: ValueError: If the specified frame/slice is already occupied Note: Automatically creates frames as needed to reach the specified frame number. """ numberOfFrames = len(self.__activeConnections[apIndex]) if numberOfFrames < frame + 1: for _ in range(numberOfFrames, frame + 1): self.__activeConnections[apIndex].append([]) for __ in range(connection.AP.slicesInFrame): self.__activeConnections[apIndex][-1].append(False) if self.__activeConnections[apIndex][frame][slice] != False: raise ValueError( f"The connection in frame: {frame}, slice: {slice} is already used." ) else: self.__activeConnections[apIndex][frame][slice] = connection
[docs] def framesState(self, ap: AccessPoint): """Get the frame/slice allocation state for an access point. Returns the complete frame and slice structure showing which connections occupy which time slots. Args: ap: Access point to query Returns: list[list[Union[bool, Connection]]]: Frame/slice structure - Outer list: frames - Inner list: slices (False if empty, Connection if occupied) Example:: frames = controller.framesState(vled) for frame_idx, frame in enumerate(frames): for slice_idx, slot in enumerate(frame): if slot: # Not False print(f"Frame {frame_idx}, slice {slice_idx}: " f"Connection {slot.id}") """ return self.__activeConnections[self.APPosition(ap)]
@staticmethod def _assign_frame_slices( connection: Connection, controller: "Controller", numberOfSlices: int, ) -> None: actualSlice = connection.nextSliceInAPWhenArriving(connection.AP) aux = 0 auxFrame = 0 # Actual frame for slice in range(actualSlice, connection.AP.slicesInFrame): if ( len(controller.framesState(connection.AP)) == 0 or controller.framesState(connection.AP)[0][slice] == False ): connection.assignFrameSlice(0, slice) aux += 1 break # Next frames for frameIndex in range(1, len(controller.framesState(connection.AP))): for slice in range(connection.AP.slicesInFrame): if controller.framesState(connection.AP)[frameIndex][slice] == False: connection.assignFrameSlice(frameIndex, slice) aux += 1 auxFrame = frameIndex break if aux == numberOfSlices: break frameIndex = auxFrame + 1 while aux < numberOfSlices: connection.assignFrameSlice(frameIndex, 0) frameIndex += 1 aux += 1 @staticmethod def _select_vled_candidate( receiver: "Receiver", connection: Connection, scenario: Scenario, controller: "Controller", vled_snr: list[float], ) -> Optional[tuple[VLed, float, float, int]]: best_candidate: Optional[tuple[VLed, float, float, int]] = None for vled_pos, vled in enumerate(scenario.vleds): if ( controller.numberOfActiveConnections(vled) >= Controller.MAX_ACTIVE_CONNECTIONS_PER_VLED ): continue capacity = scenario.capacityVled(receiver, vled) numberOfSlices = connection.numberOfSlicesNeeded( connection.capacityRequired, capacity, ) if numberOfSlices >= Controller.MAX_ACTIVE_CONNECTIONS_PER_VLED: continue candidate = (vled, vled_snr[vled_pos], capacity, numberOfSlices) if best_candidate is None or candidate[1] > best_candidate[1]: best_candidate = candidate return best_candidate @staticmethod def _select_rf_candidate( receiver: "Receiver", connection: Connection, scenario: Scenario, controller: "Controller", rf_snr: list[float], ) -> Optional[tuple[RF, float, float, int]]: best_candidate: Optional[tuple[RF, float, float, int]] = None for rf_pos, rf in enumerate(scenario.rfs): if ( controller.numberOfActiveConnections(rf) >= Controller.MAX_ACTIVE_CONNECTIONS_PER_RF ): continue capacity = scenario.capacityRf(receiver, rf) numberOfSlices = connection.numberOfSlicesNeeded( connection.capacityRequired, capacity, ) candidate = (rf, rf_snr[rf_pos], capacity, numberOfSlices) if best_candidate is None or candidate[1] > best_candidate[1]: best_candidate = candidate return best_candidate
[docs] @staticmethod def default_alloc( receiver: "Receiver", connection: Connection, scenario: Scenario, controller: "Controller", ) -> tuple[Any, Connection]: """Default allocation algorithm for connections. Implements a hybrid VLC/RF allocation strategy: 1. Try to allocate to the best available VLed 2. If no VLed is available, fall back to the best available RF 3. Wait only when both VLed and RF access points are unavailable 4. Assign available frame/slice positions Args: receiver: Receiver requesting connection connection: Connection object to allocate scenario: Simulation scenario controller: Controller managing allocation Returns: tuple: (Controller.status.ALLOCATED, connection) when an AP is assigned, otherwise (Controller.status.WAIT, connection) Raises: ValueError: If the connection lacks a capacity requirement Note: - Prefers available VLed APs over RF fallback - Limits VLed connections to 5 per AP - Limits RF connections to 12 per AP - Automatically finds free frame/slice positions Example: This is the default allocator, but you can use it as a template:: controller.allocator = Controller.default_alloc # Or create custom allocator with similar signature """ if connection.capacityRequired is None: raise ValueError("Connection must have capacityRequired set.") vleds = scenario.vleds rfs = scenario.rfs vled_snr: list[float] = [] rf_snr: list[float] = [] for vled in vleds: vled_snr.append(scenario.snrVled(receiver, vled)) for rf in rfs: rf_snr.append(scenario.snrRf(receiver, rf)) candidate = Controller._select_vled_candidate( receiver, connection, scenario, controller, vled_snr, ) if candidate is None: candidate = Controller._select_rf_candidate( receiver, connection, scenario, controller, rf_snr, ) if candidate is None: return Controller.status.WAIT, connection ap, snr, capacity, numberOfSlices = candidate connection.AP = ap connection.receiver.capacityFromAP = capacity connection.snr = snr Controller._assign_frame_slices(connection, controller, numberOfSlices) return Controller.status.ALLOCATED, connection