Source code for vlcsim.controller.connection

"""Connection class for VLC data transmission management.

This module defines the Connection class, which represents a data transmission
connection between a receiver device and an access point in VLC simulations.
"""

from __future__ import annotations
from typing import Optional
import math
import sys

# Import AccessPoint and Receiver from scene module
from ..scene import AccessPoint, Receiver


[docs] class Connection: """Represents a data transmission connection in VLC simulation. Connection objects encapsulate all information about a data transmission session between a receiver device and an access point (AP). Each connection tracks its allocation status, assigned time slices, capacity requirements, and timing information. The Connection class manages: - Unique identification and association with receiver and AP - Frame/slice allocation in TDM scheduling - Capacity requirements and SNR tracking - Connection timing (arrival, goal time, completion) - Allocation status (allocated, waiting, refused) Attributes: All attributes are private and accessed via properties. Example: Creating a connection:: receiver = Receiver(x=5.0, y=5.0, z=0.85, aDet=1e-4, ts=0.1, index=1.5, fov=60.0) connection = Connection(id=0, receiver=receiver, time=0.0) connection.capacityRequired = 1e6 # 1 Mbps # Assign to access point connection.AP = vled connection.assignFrameSlice(frame=0, slice=3) """
[docs] def __init__(self, id: int, receiver: "Receiver", time: float) -> None: """Initialize a Connection object. Creates a new connection with specified ID, receiver, and arrival time. The connection is initially marked as allocated but not yet assigned to an AP. Args: id: Connection unique identifier (non-negative integer) receiver: Receiver device for this connection time: Connection arrival time in seconds (non-negative) Raises: ValueError: If id or time is negative Example:: receiver = Receiver(5.0, 5.0, 0.85, 1e-4, 0.1, 1.5, 60.0) conn = Connection(id=1, receiver=receiver, time=10.5) """ if id < 0: raise ValueError("id must be a non-negative integer.") if time < 0: raise ValueError("time must be a non-negative float.") self.__id: int = id self.__ap: Optional[AccessPoint] = None self.__receiver: Receiver = receiver self.__allocated: bool = True self.__frameSlice: list[list[int]] = [] self.__timesAssigned: list[float] = [] self.__goalTime: Optional[float] = None self.__time: float = float(time) self.__capacityRequired: Optional[float] = None self.__snr: float = sys.float_info.min
@property def snr(self) -> float: """Get the Signal-to-Noise Ratio for this connection. Returns: float: SNR value (dimensionless), minimum value is sys.float_info.min """ return self.__snr @snr.setter def snr(self, value: float): """Set the Signal-to-Noise Ratio. Args: value: SNR value to set """ self.__snr = float(value) @property def capacityRequired(self) -> Optional[float]: """Get the capacity required for this connection. Returns: float or None: Required capacity in bits per second (bps), or None if not set """ return self.__capacityRequired @capacityRequired.setter def capacityRequired(self, value: Optional[float]): """Set the capacity required for this connection. Args: value: Required capacity in bps, or None Raises: ValueError: If value is negative """ if value is not None and value < 0: raise ValueError("capacityRequired must be a non-negative float or None.") self.__capacityRequired = float(value) if value is not None else None @property def time(self) -> float: """Get the arrival time of this connection. Returns: float: Time in seconds when this connection arrived in the simulation """ return self.__time @time.setter def _time(self, value: float): """Set the arrival time (private setter). Args: value: Arrival time in seconds Raises: ValueError: If value is negative """ if value < 0: raise ValueError("time must be a non-negative float.") self.__time = float(value) @property def goalTime(self) -> Optional[float]: """Get the goal time for this connection. The goal time represents the total effective time that the connection will use the channel, calculated based on required and available capacity. Returns: float or None: Effective transmission time in seconds, or None if not calculated """ return self.__goalTime @goalTime.setter def goalTime(self, value: Optional[float]): """Set the goal time for this connection. Args: value: Goal time in seconds, or None Raises: ValueError: If value is negative """ if value is not None and value < 0: raise ValueError("goalTime must be a non-negative float or None.") self.__goalTime = float(value) if value is not None else None @property def frameSlice(self) -> list[list[int]]: """Get the list of frame/slice assignments for this connection. Returns: list[list[int]]: List of [frame, slice] pairs that this connection will use in TDM scheduling Example:: >>> connection.frameSlice [[0, 3], [0, 7], [1, 2]] """ return self.__frameSlice @frameSlice.setter def frameSlice(self, value: list[list[int]]): """Set the frame/slice assignments. Args: value: List of [frame, slice] pairs """ self.__frameSlice = value
[docs] def getNextTime(self) -> float: """Get the next transmission start time for this connection. Retrieves and removes the next scheduled transmission time from the queue. Used by the simulation event scheduler. Returns: float: Next transmission start time in seconds Raises: IndexError: If no assigned times remain """ if not self.__timesAssigned: raise IndexError("No assigned times left for this connection.") return self.__timesAssigned.pop(0)
[docs] def insertTime(self, time: float) -> None: """Insert a transmission time into the scheduled times queue. Maintains the time queue in chronological order by inserting the new time at the appropriate position. Works with the simulation Future Event List (FEL). Args: time: Transmission start time in seconds to schedule Note: Times are maintained in ascending order automatically. """ for i in range(len(self.__timesAssigned)): if self.__timesAssigned[i] > time: self.__timesAssigned.insert(i, time) break self.__timesAssigned.append(time)
[docs] def assignFrameSlice(self, frame: int, slice: int): """Assign a specific frame and slice to this connection. Adds a [frame, slice] pair to this connection's TDM allocation. Args: frame: Frame number to assign slice: Slice number within the frame Example:: connection.assignFrameSlice(frame=0, slice=3) connection.assignFrameSlice(frame=1, slice=0) """ self.__frameSlice.append([frame, slice])
[docs] def numberOfSlicesNeeded( self, capacityRequired: float, capacityFromAP: float ) -> int: """Calculate the number of TDM slices needed for this connection. Determines how many time slices are required based on the capacity demand and the capacity provided by the access point. Args: capacityRequired: Required data capacity in bps capacityFromAP: Available capacity from AP in bps Returns: int: Number of slices needed (rounded up) Raises: ValueError: If capacityFromAP is zero or negative Example:: slices = connection.numberOfSlicesNeeded( capacityRequired=1e6, # 1 Mbps capacityFromAP=250e3 # 250 kbps per slice ) # Returns 4 slices """ if capacityFromAP <= 0: raise ValueError("capacityFromAP must be a positive number.") return math.ceil(capacityRequired / capacityFromAP)
[docs] def nextSliceInAPWhenArriving(self, ap: AccessPoint) -> int: """Determine the next available slice position when connection arrives. Calculates which slice position will be active when this connection arrives, based on the arrival time and AP's frame structure. Args: ap: Access Point to check Returns: int: Slice position (0 to ap.slicesInFrame - 1) Raises: ValueError: If AP doesn't have valid sliceTime or slicesInFrame Note: Result is computed as: ceil(arrival_time / sliceTime) mod slicesInFrame """ if ap.sliceTime is None or ap.slicesInFrame is None: raise ValueError("AccessPoint must have valid sliceTime and slicesInFrame.") if ap.sliceTime <= 0 or ap.slicesInFrame <= 0: raise ValueError("sliceTime and slicesInFrame must be positive numbers.") return math.ceil(self.__time / ap.sliceTime) % ap.slicesInFrame
@property def receiver(self) -> Receiver: """Get the receiver associated with this connection. Returns: Receiver: The receiver device for this connection """ return self.__receiver @receiver.setter def receiver(self, value: Receiver): """Set the receiver for this connection. Args: value: Receiver device to associate """ self.__receiver = value @property def id(self) -> int: """Get the unique identifier of this connection. Returns: int: Connection ID """ return self.__id @property def AP(self) -> Optional[AccessPoint]: """Get the Access Point assigned to this connection. Returns: AccessPoint or None: Access Point used by this connection, or None if not assigned """ return self.__ap @AP.setter def AP(self, ap: Optional[AccessPoint]): """Set the Access Point for this connection. Args: ap: Access Point to assign, or None to unassign """ self.__ap = ap @property def allocated(self) -> bool: """Check if this connection has been allocated. Returns: bool: True if connection was allocated, False otherwise """ return self.__allocated @allocated.setter def allocated(self, value: bool): """Set the allocation status of this connection. Args: value: Allocation status """ self.__allocated = value