"""Controller class for VLC resource allocation and connection management.
This module defines the Controller class, which manages connection allocation,
assignment, and lifecycle for VLC simulations. It provides customizable allocation
algorithms and TDM frame/slice management.
"""
from __future__ import annotations
from typing import Optional, Union, Callable, Any
from enum import Enum
# Import all necessary classes from scene and connection modules
from ..scene import AccessPoint, VLed, RF, Receiver, Scenario
from .connection import Connection
[docs]
class Controller:
"""Controller for managing VLC connections and resource allocation.
The Controller class manages all connection lifecycle operations including allocation,
assignment, pausing, resuming, and termination. It maintains the scenario infrastructure,
tracks active connections per access point, and executes allocation algorithms.
The Controller implements a flexible allocation system where custom algorithms can be
plugged in to handle different resource allocation strategies.
Class Attributes:
status (Enum): Allocation algorithm return status
- ALLOCATED: Connection successfully allocated
- NOT_ALLOCATED: Connection refused (no resources)
- WAIT: Connection will wait for future allocation attempt
nextStatus (Enum): Internal controller status for connection state transitions
- PAUSE: Connection paused, will resume later
- FINISH: Connection completed transmission
- RESUME: Connection resuming after pause
- IDLE: Access Point has no connections
- RND_WAIT: Connection waiting random time before retry
Example:
Basic controller setup::
# Create controller
controller = Controller(x=10.0, y=10.0, z=3.0, nGrids=20, rho=0.8)
# Add access points to scenario
vled = VLed(x=5.0, y=5.0, z=3.0, nLedsX=2, nLedsY=2,
ledPower=20, theta=60)
controller.scenario.addVLed(vled)
# Set allocation algorithm
controller.allocator = Controller.default_alloc
# Initialize internal structures
controller.init()
# Create and assign connection
receiver = Receiver(x=3.0, y=3.0, z=0.85, aDet=1e-4,
ts=0.1, index=1.5, fov=60.0)
connection = Connection(id=0, receiver=receiver, time=0.0)
connection.capacityRequired = 1e6 # 1 Mbps
status, next_time, conn = controller.assignConnection(connection, 0.0)
Note:
- Must call `init()` before assigning any connections
- Allocator function must be set before calling `assignConnection()`
"""
MAX_ACTIVE_CONNECTIONS_PER_VLED = 5
"""Maximum number of active connections allowed per VLed."""
MAX_ACTIVE_CONNECTIONS_PER_RF = 12
"""Maximum number of active connections allowed per RF access point."""
status = Enum("status", "ALLOCATED NOT_ALLOCATED WAIT")
"""Allocation algorithm return status.
Indicates the result of an allocation attempt:
- ALLOCATED: Connection successfully allocated to an AP
- NOT_ALLOCATED: Connection refused (insufficient resources)
- WAIT: Connection will wait and retry allocation later
"""
nextStatus = Enum("nextStatus", "PAUSE FINISH RESUME IDLE RND_WAIT")
"""Internal controller status for connection state transitions.
Used within allocation routines to indicate next action:
- PAUSE: Connection paused, will resume transmission later
- FINISH: Connection completed its transmission goal
- RESUME: Connection resuming transmission after pause
- IDLE: Access Point has no active connections
- RND_WAIT: Connection waiting random time before retry
"""
[docs]
def __init__(self, x: float, y: float, z: float, nGrids: int, rho: float) -> None:
"""Initialize the Controller with scenario dimensions.
Creates a Controller instance managing a rectangular room scenario with
specified dimensions and discretization parameters.
Args:
x: Room length in meters (X-axis)
y: Room width in meters (Y-axis)
z: Room height in meters (Z-axis)
nGrids: Number of grid divisions per meter for wall reflection calculations
rho: Wall reflection coefficient (0.0 to 1.0)
Example::
# Create controller for 10x10x3m room
controller = Controller(x=10.0, y=10.0, z=3.0, nGrids=20, rho=0.8)
"""
self.__scenario: Scenario = Scenario(x, y, z, nGrids, rho)
self.__allocator: Optional[
Callable[
[Receiver, Connection, Scenario, "Controller"], tuple[int, Connection]
]
] = None
self.__allocationStatus: Optional[int] = None
self.__activeConnections: list[list[list[Union[bool, Connection]]]] = []
self.__numberActiveConnections: list[int] = []
# self.__activeConnections = [[]] * len(self.__scenario.vleds)
@property
def scenario(self) -> Scenario:
"""Get the scenario managed by this controller.
Returns:
Scenario: The simulation scenario containing all access points and room configuration
"""
return self.__scenario
@property
def allocationStatus(self):
"""Get the status of the last allocation attempt.
Returns:
status: One of Controller.status enum values (ALLOCATED, NOT_ALLOCATED, or WAIT)
"""
return self.__allocationStatus
@property
def allocator(self):
"""Get the allocation algorithm function.
The allocator is a callable that determines how connections are assigned to
access points. It must follow this signature:
.. code-block:: python
def alloc_function(
receiver: Receiver,
connection: Connection,
scenario: Scenario,
controller: Controller
) -> tuple[int, Connection]:
# Allocation logic here
return Controller.status.ALLOCATED, connection
Returns:
Callable or None: The current allocation function, or None if not set
Example:
Custom allocation algorithm::
def my_alloc(receiver, connection, scenario, controller):
# Find best VLed by SNR
vleds = scenario.vleds
snrs = [scenario.snrVled(receiver, v) for v in vleds]
best_idx = snrs.index(max(snrs))
# Check if VLed has capacity
if controller.numberOfActiveConnections(vleds[best_idx]) < 5:
connection.AP = vleds[best_idx]
return Controller.status.ALLOCATED, connection
else:
return Controller.status.WAIT, connection
controller.allocator = my_alloc
"""
return self.__allocator
@allocator.setter
def allocator(
self,
allocator: Callable[
[Receiver, Connection, Scenario, "Controller"], tuple[int, Connection]
],
):
"""Set the allocation algorithm.
Args:
allocator: Function implementing the allocation logic
"""
self.__allocator = allocator
[docs]
def assignConnection(self, connection: Connection, time: float):
"""Assign a connection to an access point using the allocation algorithm.
Executes the allocation algorithm to determine if and where a connection should
be assigned. If successful, allocates TDM frame/slice resources and schedules
transmission times.
Args:
connection: Connection object to assign
time: Current simulation time in seconds
Returns:
tuple: (nextStatus, next_time, connection)
- nextStatus: One of Controller.nextStatus enum values
- next_time: Next event time for this connection
- connection: Modified connection object (or None if not allocated)
Raises:
ValueError: If allocator not set, AP invalid, or required fields missing
Exception: If trying to assign a slice in the past
Example::
controller.allocator = Controller.default_alloc
status, next_time, conn = controller.assignConnection(connection, 0.0)
if status == Controller.nextStatus.RESUME:
print(f"Connection allocated, resume at {next_time}")
elif status == Controller.nextStatus.RND_WAIT:
print("Connection waiting, will retry")
"""
if self.__allocator is None:
raise ValueError(
"Allocator function must be set before assigning connections."
)
self.__allocationStatus, connection = self.__allocator(
connection.receiver, connection, self.__scenario, self
)
connection.receiver.timeFirstConnected = time
if self.__allocationStatus == Controller.status.ALLOCATED:
if connection.AP is None:
raise ValueError(
"Connection must have a valid AccessPoint (AP) after allocation."
)
index = self.APPosition(connection.AP)
self.__numberActiveConnections[index] += 1
actualSlice = connection.nextSliceInAPWhenArriving(connection.AP) - 1
actualTime = (
(time // (connection.AP.slicesInFrame * connection.AP.sliceTime))
* connection.AP.slicesInFrame
* connection.AP.sliceTime
)
auxPreviousTime = 0
if actualSlice == -1:
auxPreviousTime = 1
if (
connection.receiver.capacityFromAP is None
or connection.AP.sliceTime is None
):
raise ValueError(
"Receiver must have capacityFromAP and AP must have sliceTime set."
)
if connection.capacityRequired is None:
raise ValueError("Connection must have capacityRequired set.")
connection.receiver.goalTime = (
connection.capacityRequired
/ connection.receiver.capacityFromAP
* connection.AP.sliceTime
)
connection.goalTime = connection.receiver.goalTime
for fs in connection.frameSlice:
if fs[0] <= 0 and fs[1] <= actualSlice:
raise Exception("You are trying to assign a slice in the past...")
self.assignSlice(index, fs[0], fs[1], connection)
connection.insertTime(
actualTime
+ connection.AP.sliceTime
* connection.AP.slicesInFrame
* (fs[0] + auxPreviousTime)
+ fs[1] * connection.AP.sliceTime
)
time = connection.getNextTime()
return Controller.nextStatus.RESUME, time, connection
elif self.__allocationStatus == Controller.status.NOT_ALLOCATED:
return Controller.nextStatus.IDLE, time, connection
elif self.__allocationStatus == Controller.status.WAIT:
return Controller.nextStatus.RND_WAIT, time, connection
else:
raise Exception("Return status of allocation algorithm not supported")
[docs]
def pauseConnection(self, connection: Connection, time: float):
"""Pause an active connection temporarily.
Suspends a connection's transmission, updating its active time and scheduling
the next resume time. Manages frame cleanup if no more slices remain in current frame.
Args:
connection: Connection to pause
time: Current simulation time in seconds
Returns:
tuple: (Controller.nextStatus.RESUME, next_time, connection)
- Status is always RESUME
- next_time: Time when connection will resume
- connection: The connection object
Raises:
ValueError: If connection has no assigned AP
Note:
Updates receiver.timeActive with the slice duration.
"""
if connection.AP is None:
raise ValueError("Connection must have a valid AccessPoint (AP).")
receiver = connection.receiver
index = self.APPosition(connection.AP)
receiver.timeActive += connection.AP.sliceTime
timeNext = connection.getNextTime()
nextSlice = connection.nextSliceInAPWhenArriving(connection.AP)
flag = False
for i in range(nextSlice, connection.AP.slicesInFrame):
if len(self.__activeConnections[index]) == 0:
flag = True
break
if self.__activeConnections[index][0][i] != False:
flag = True
break
if not flag:
self.__activeConnections[index].pop(0)
return Controller.nextStatus.RESUME, timeNext, connection
[docs]
def resumeConnection(self, connection: Connection, time: float):
"""Resume a paused connection.
Checks if the connection has completed its goal time or needs another slice.
Determines whether to finish or pause again based on remaining transmission time.
Args:
connection: Connection to resume
time: Current simulation time in seconds
Returns:
tuple: (nextStatus, next_time, connection)
- nextStatus: FINISH if goal reached, PAUSE if needs more time
- next_time: Time for next event (finish time or next pause)
- connection: The connection object
Raises:
ValueError: If connection has no AP or receiver has no goalTime
Note:
Compares receiver.goalTime with receiver.timeActive + sliceTime.
"""
if connection.AP is None:
raise ValueError("Connection must have a valid AccessPoint (AP).")
receiver = connection.receiver
if receiver.goalTime is None:
raise ValueError(
"Receiver must have goalTime set before resuming connection."
)
if receiver.goalTime < receiver.timeActive + connection.AP.sliceTime:
return (
Controller.nextStatus.FINISH,
time + receiver.goalTime - receiver.timeActive,
connection,
)
else:
return (
Controller.nextStatus.PAUSE,
time + connection.AP.sliceTime,
connection,
)
[docs]
def unassignConnection(self, connection: Connection, time: float):
"""Unassign and finalize a connection.
Removes a connection from its AP, updates receiver timing, cleans up frame
allocations, and decrements the AP's active connection count.
Args:
connection: Connection to unassign
time: Current simulation time in seconds
Returns:
tuple: (Controller.nextStatus.IDLE, time, None)
- Status is always IDLE
- time: Current simulation time
- None: No connection returned
Raises:
ValueError: If connection has no assigned AP
Note:
Sets receiver.timeFinished and ensures receiver.timeActive = goalTime.
"""
if connection.AP is None:
raise ValueError("Connection must have a valid AccessPoint (AP).")
index = self.APPosition(connection.AP)
receiver = connection.receiver
if receiver.goalTime is not None:
receiver.timeActive = receiver.goalTime
receiver.timeFinished = time
nextSlice = connection.nextSliceInAPWhenArriving(connection.AP)
flag = False
for i in range(nextSlice, connection.AP.slicesInFrame):
if len(self.__activeConnections[index]) == 0:
break
if self.__activeConnections[index][0][i] != False:
flag = True
if flag:
self.__activeConnections[index].pop(self.__activeConnection[index])
self.__numberActiveConnections[index] -= 1
return Controller.nextStatus.IDLE, time, None
[docs]
def init(self):
"""Initialize controller's internal structures for simulation.
Must be invoked before starting the simulation. Initializes active connection
lists and connection counters for each access point in the scenario.
Performs validation on scenario and AP configuration.
Raises:
RuntimeError: If scenario, APs, or their configuration is invalid
Note:
- Creates frame/slice structure for each AP based on slicesInFrame
- Sorts APs by position to ensure correct indexing
- Must be called after all APs have been added to the scenario
Example::
controller = Controller(10.0, 10.0, 3.0, 20, 0.8)
vled = VLed(5.0, 5.0, 3.0, 2, 2, 20, 60)
vled.slicesInFrame = 10
controller.scenario.addVLed(vled)
controller.init() # Required before simulation
"""
self.__activeConnections = []
self.__numberActiveConnections = []
if not hasattr(self, "_Controller__scenario") or self.__scenario is None:
raise RuntimeError("The scenario is not configured in the controller.")
vleds = getattr(self.__scenario, "vleds", None)
rfs = getattr(self.__scenario, "rfs", None)
if vleds is None or rfs is None:
raise RuntimeError("The scenario must have 'vleds' and 'rfs' lists.")
# Collect all APs with their positions
all_aps: list[tuple[int, Union[VLed, RF]]] = []
for idx, vled in enumerate(vleds):
pos = self.__scenario.vledsPositions[idx]
all_aps.append((pos, vled))
for idx, rf in enumerate(rfs):
pos = self.__scenario.rfsPositions[idx]
all_aps.append((pos, rf))
# Sort APs by position to ensure correct order
all_aps.sort(key=lambda x: x[0])
# Initialize structures for each AP in order
for pos, ap in all_aps:
self.__numberActiveConnections.append(0)
self.__activeConnections.append([])
if not hasattr(ap, "slicesInFrame") or ap.slicesInFrame is None:
raise RuntimeError(
f"The AP at position {pos} does not have 'slicesInFrame' defined."
)
if not isinstance(ap.slicesInFrame, int) or ap.slicesInFrame <= 0:
raise RuntimeError(
f"'slicesInFrame' must be a positive integer for the AP at position {pos}."
)
self.__activeConnections[-1].append([])
for _ in range(ap.slicesInFrame):
self.__activeConnections[-1][-1].append(False)
self.__activeConnection = [0] * len(self.__activeConnections)
[docs]
def APPosition(self, ap: Union[VLed, RF, AccessPoint]) -> int:
"""Get the position index of an access point in the controller.
Returns the internal index used to track an AP in the controller's data structures.
Works for both VLed and RF access points.
Args:
ap: Access point (VLed or RF)
Returns:
int: Position index of the AP in controller structures
Note:
Returns -1 if AP is not found (should not occur in normal operation).
"""
if isinstance(ap, VLed):
vleds = getattr(self.__scenario, "vleds", [])
for idx, vled in enumerate(vleds):
if vled is ap:
return self.__scenario.vledsPositions[idx]
elif isinstance(ap, RF):
rfs = getattr(self.__scenario, "rfs", [])
for idx, rf in enumerate(rfs):
if rf is ap:
return self.__scenario.rfsPositions[idx]
return -1 # Should not reach here
@property
def activeConnections(self):
"""Get the active connections data structure.
Returns a nested list structure where each outer element corresponds to an AP,
and contains frames, which contain slices. Each slice is either False (empty)
or a Connection object.
Returns:
list[list[list[Union[bool, Connection]]]]: Three-level structure:
- Level 1: List of APs (length = number of APs)
- Level 2: List of frames per AP (grows dynamically)
- Level 3: List of slices per frame (length = slicesInFrame)
Example::
# Access slice 5 of frame 2 on AP 0
if controller.activeConnections[0][2][5]:
connection = controller.activeConnections[0][2][5]
print(f"Slice occupied by connection {connection.id}")
"""
return self.__activeConnections
[docs]
def numberOfActiveConnections(self, ap: Union[VLed, RF, AccessPoint]) -> int:
"""Get the number of active connections on a specific AP.
Args:
ap: Access point to query (VLed or RF)
Returns:
int: Current number of active connections on this AP
Example::
active = controller.numberOfActiveConnections(vled)
if active < 5:
# AP has capacity for more connections
pass
"""
return self.__numberActiveConnections[self.APPosition(ap)]
[docs]
def assignSlice(self, apIndex: int, frame: int, slice: int, connection: Connection):
"""Assign a connection to a specific frame and slice on an AP.
Allocates a TDM slice for the given connection. Creates new frames if necessary.
Args:
apIndex: Index of the AP in controller structures
frame: Frame number to assign
slice: Slice number within the frame
connection: Connection to assign
Raises:
ValueError: If the specified frame/slice is already occupied
Note:
Automatically creates frames as needed to reach the specified frame number.
"""
numberOfFrames = len(self.__activeConnections[apIndex])
if numberOfFrames < frame + 1:
for _ in range(numberOfFrames, frame + 1):
self.__activeConnections[apIndex].append([])
for __ in range(connection.AP.slicesInFrame):
self.__activeConnections[apIndex][-1].append(False)
if self.__activeConnections[apIndex][frame][slice] != False:
raise ValueError(
f"The connection in frame: {frame}, slice: {slice} is already used."
)
else:
self.__activeConnections[apIndex][frame][slice] = connection
[docs]
def framesState(self, ap: AccessPoint):
"""Get the frame/slice allocation state for an access point.
Returns the complete frame and slice structure showing which connections
occupy which time slots.
Args:
ap: Access point to query
Returns:
list[list[Union[bool, Connection]]]: Frame/slice structure
- Outer list: frames
- Inner list: slices (False if empty, Connection if occupied)
Example::
frames = controller.framesState(vled)
for frame_idx, frame in enumerate(frames):
for slice_idx, slot in enumerate(frame):
if slot: # Not False
print(f"Frame {frame_idx}, slice {slice_idx}: "
f"Connection {slot.id}")
"""
return self.__activeConnections[self.APPosition(ap)]
@staticmethod
def _assign_frame_slices(
connection: Connection,
controller: "Controller",
numberOfSlices: int,
) -> None:
actualSlice = connection.nextSliceInAPWhenArriving(connection.AP)
aux = 0
auxFrame = 0
# Actual frame
for slice in range(actualSlice, connection.AP.slicesInFrame):
if (
len(controller.framesState(connection.AP)) == 0
or controller.framesState(connection.AP)[0][slice] == False
):
connection.assignFrameSlice(0, slice)
aux += 1
break
# Next frames
for frameIndex in range(1, len(controller.framesState(connection.AP))):
for slice in range(connection.AP.slicesInFrame):
if controller.framesState(connection.AP)[frameIndex][slice] == False:
connection.assignFrameSlice(frameIndex, slice)
aux += 1
auxFrame = frameIndex
break
if aux == numberOfSlices:
break
frameIndex = auxFrame + 1
while aux < numberOfSlices:
connection.assignFrameSlice(frameIndex, 0)
frameIndex += 1
aux += 1
@staticmethod
def _select_vled_candidate(
receiver: "Receiver",
connection: Connection,
scenario: Scenario,
controller: "Controller",
vled_snr: list[float],
) -> Optional[tuple[VLed, float, float, int]]:
best_candidate: Optional[tuple[VLed, float, float, int]] = None
for vled_pos, vled in enumerate(scenario.vleds):
if (
controller.numberOfActiveConnections(vled)
>= Controller.MAX_ACTIVE_CONNECTIONS_PER_VLED
):
continue
capacity = scenario.capacityVled(receiver, vled)
numberOfSlices = connection.numberOfSlicesNeeded(
connection.capacityRequired,
capacity,
)
if numberOfSlices >= Controller.MAX_ACTIVE_CONNECTIONS_PER_VLED:
continue
candidate = (vled, vled_snr[vled_pos], capacity, numberOfSlices)
if best_candidate is None or candidate[1] > best_candidate[1]:
best_candidate = candidate
return best_candidate
@staticmethod
def _select_rf_candidate(
receiver: "Receiver",
connection: Connection,
scenario: Scenario,
controller: "Controller",
rf_snr: list[float],
) -> Optional[tuple[RF, float, float, int]]:
best_candidate: Optional[tuple[RF, float, float, int]] = None
for rf_pos, rf in enumerate(scenario.rfs):
if (
controller.numberOfActiveConnections(rf)
>= Controller.MAX_ACTIVE_CONNECTIONS_PER_RF
):
continue
capacity = scenario.capacityRf(receiver, rf)
numberOfSlices = connection.numberOfSlicesNeeded(
connection.capacityRequired,
capacity,
)
candidate = (rf, rf_snr[rf_pos], capacity, numberOfSlices)
if best_candidate is None or candidate[1] > best_candidate[1]:
best_candidate = candidate
return best_candidate
[docs]
@staticmethod
def default_alloc(
receiver: "Receiver",
connection: Connection,
scenario: Scenario,
controller: "Controller",
) -> tuple[Any, Connection]:
"""Default allocation algorithm for connections.
Implements a hybrid VLC/RF allocation strategy:
1. Try to allocate to the best available VLed
2. If no VLed is available, fall back to the best available RF
3. Wait only when both VLed and RF access points are unavailable
4. Assign available frame/slice positions
Args:
receiver: Receiver requesting connection
connection: Connection object to allocate
scenario: Simulation scenario
controller: Controller managing allocation
Returns:
tuple: (Controller.status.ALLOCATED, connection) when an AP is assigned,
otherwise (Controller.status.WAIT, connection)
Raises:
ValueError: If the connection lacks a capacity requirement
Note:
- Prefers available VLed APs over RF fallback
- Limits VLed connections to 5 per AP
- Limits RF connections to 12 per AP
- Automatically finds free frame/slice positions
Example:
This is the default allocator, but you can use it as a template::
controller.allocator = Controller.default_alloc
# Or create custom allocator with similar signature
"""
if connection.capacityRequired is None:
raise ValueError("Connection must have capacityRequired set.")
vleds = scenario.vleds
rfs = scenario.rfs
vled_snr: list[float] = []
rf_snr: list[float] = []
for vled in vleds:
vled_snr.append(scenario.snrVled(receiver, vled))
for rf in rfs:
rf_snr.append(scenario.snrRf(receiver, rf))
candidate = Controller._select_vled_candidate(
receiver,
connection,
scenario,
controller,
vled_snr,
)
if candidate is None:
candidate = Controller._select_rf_candidate(
receiver,
connection,
scenario,
controller,
rf_snr,
)
if candidate is None:
return Controller.status.WAIT, connection
ap, snr, capacity, numberOfSlices = candidate
connection.AP = ap
connection.receiver.capacityFromAP = capacity
connection.snr = snr
Controller._assign_frame_slices(connection, controller, numberOfSlices)
return Controller.status.ALLOCATED, connection