Source code for vlcsim.simulator

"""Simulator Module for VLC Event-Driven Simulation.

This module provides the simulation engine for VLC (Visible Light Communication) systems.
It implements an event-driven discrete-event simulation (DES) with support for:

Classes:
    Event: Represents simulation events (arrivals, departures, pauses, resumes)
    Simulator: Main simulation engine managing event scheduling and execution

The Simulator module handles:
    - Event scheduling and management using Future Event List (FEL)
    - Random variable generation for arrivals, departures, and positions
    - Connection lifecycle tracking (arrive → allocate → transmit → depart)
    - Statistical data collection (blocking probability, AP utilization)
    - Scenario initialization and execution control

Example:
    Basic simulation setup::

        from vlcsim import Simulator, VLed, RF

        # Create simulator with 10x10x3m room
        sim = Simulator(x=10.0, y=10.0, z=3.0, nGrids=20, rho=0.8)

        # Add access points
        vled = VLed(x=5.0, y=5.0, z=3.0, nLedsX=2, nLedsY=2, ledPower=20, theta=60)
        sim.scenario.addVLed(vled)

        # Configure simulation parameters
        sim.lambdaS = 3.0  # Arrival rate (connections/second)
        sim.mu = 10.0      # Service rate (1/seconds)
        sim.goalConnections = 1000

        # Run simulation
        sim.init()
        sim.run()

        # Get results
        print(f"Blocking probability: {sim.get_Blocking_Probability()}")

Note:
    - Must call `init()` before `run()`
    - Uses exponential inter-arrival times (Poisson process)
    - Supports both VLC (VLed) and RF access points
"""

from .controller import *
from .scene import *
import numpy as np
import time
from enum import Enum
from typing import Optional, List, Any, Callable


[docs] class Event: """Represents a simulation event in the discrete-event simulation. The Event class encapsulates all information about a simulation event including timing, type, and associated connection. Events are stored in a Future Event List (FEL) and processed chronologically by the Simulator. Attributes: event (Enum): Event type enumeration with 5 possible values Event Types: - ARRIVE: New connection arrives to scenario - DEPARTURE: Connection completes transmission and leaves - PAUSE: Connection pauses transmission (TDM frame/slice boundary) - RESUME: Connection resumes transmission after pause - NEXT_CONNECTION_TRY: Connection retries allocation after waiting Example:: # Create arrival event event = Event(type=Event.event.ARRIVE, time=5.2, id_connection=42) event.connection = connection_object # Check event type if event.type == Event.event.ARRIVE: print(f"Connection {event.id_connection} arrived at {event.time}") """ event = Enum("event", "ARRIVE DEPARTURE PAUSE RESUME NEXT_CONNECTION_TRY") """Event type enumeration.Event type enumeration. · Five event types for simulation lifecycle: - ARRIVE: Connection arrives to scenario (triggers allocation) - DEPARTURE: Connection completes transmission and leaves - PAUSE: Connection temporarily pauses (TDM frame/slice boundary) - RESUME: Connection resumes transmission after pause - NEXT_CONNECTION_TRY: Connection retries allocation after random wait """
[docs] def __init__( self, type: Optional[event] = None, time: float = -1, id_connection: int = -1 ) -> None: self.__time: float = time self.__id_connection: int = id_connection if type is None: self.__type: Event.event = Event.event.ARRIVE else: self.__type = type self.__connection: Optional[Connection] = None
@property def type(self) -> event: """Get the event type. Returns: event: One of the five event types (ARRIVE, DEPARTURE, PAUSE, RESUME, NEXT_CONNECTION_TRY) """ return self.__type @property def time(self) -> float: """Get the simulation time when this event occurs. Returns: float: Event occurrence time in seconds """ return self.__time @property def id_connection(self) -> int: """Get the connection ID associated with this event. Returns: int: Unique connection identifier """ return self.__id_connection @property def connection(self) -> Optional[Connection]: """Get the Connection object associated with this event. Returns: Connection or None: Connection object if set, None otherwise """ return self.__connection @connection.setter def connection(self, value: Connection): """Set the Connection object for this event. Args: value: Connection object to associate with this event """ self.__connection = value
[docs] class Simulator: """Event-driven discrete-event simulator for VLC systems. The Simulator class implements a complete discrete-event simulation (DES) engine for VLC communication systems. It manages the event scheduling, random variable generation, connection lifecycle, and statistical data collection. The simulation follows these key phases: 1. **Initialization**: Configure scenario, APs, and simulation parameters 2. **Event Scheduling**: Maintain chronologically ordered Future Event List (FEL) 3. **Event Processing**: Execute event routines (arrive, pause, resume, depart) 4. **Statistics Collection**: Track blocking probability, AP utilization, delays Key Features: - Poisson arrival process (exponential inter-arrival times) - Exponential service times - Uniform spatial distribution of receivers - Customizable allocation algorithms - Support for both VLC (VLed) and RF access points Example: Complete simulation workflow:: # Create simulator sim = Simulator(x=10.0, y=10.0, z=3.0, nGrids=20, rho=0.8) # Add access points vled1 = VLed(2.5, 2.5, 3.0, 2, 2, 20, 60) vled2 = VLed(7.5, 7.5, 3.0, 2, 2, 20, 60) sim.scenario.addVLed(vled1) sim.scenario.addVLed(vled2) # Configure parameters sim.lambdaS = 3.0 # 3 arrivals/second sim.mu = 10.0 # mean service time = 0.1 seconds sim.goalConnections = 10000 sim.lower_capacity_required = 1e6 # 1 Mbps sim.upper_capacity_required = 10e6 # 10 Mbps # Run simulation sim.init() sim.run() # Get results blocking = sim.get_Blocking_Probability() print(f"Blocking Probability: {blocking}") Note: - Must call `init()` before `run()` - Seeds control reproducibility of random variables - Default values are set automatically if not specified """
[docs] def __init__(self, x: float, y: float, z: float, nGrids: int, rho: float) -> None: """Initialize a Simulator instance. Creates a new simulator with the specified scenario dimensions and wall reflection parameters. Initializes all internal state variables and sets default simulation parameters. Args: x: Room length in meters (X-axis dimension) y: Room width in meters (Y-axis dimension) z: Room height in meters (Z-axis dimension, ceiling height) nGrids: Number of grid divisions per meter for wall reflection calculations rho: Wall reflection coefficient (0.0 to 1.0, where 0=no reflection, 1=perfect reflection) Example:: # Create simulator for 10x10x3m room with 0.8 reflection sim = Simulator(x=10.0, y=10.0, z=3.0, nGrids=20, rho=0.8) """ self.__controller: Controller = Controller(x, y, z, nGrids, rho) self.__events: List[Event] = [] self.__current_event: Optional[Event] = None self.__initReady: Optional[bool] = None self.__lambdaS: Optional[float] = None self.__mu: Optional[float] = None self.__seedArrive: Optional[int] = None self.__seedDeparture: Optional[int] = None self.__seedX: Optional[int] = None self.__seedY: Optional[int] = None self.__seedZ: Optional[int] = None self.__seedRandomWait: Optional[int] = None self.__seedCapacityRequired: Optional[int] = None self.__numberOfConnections: int = 0 self.__goalConnections: int = 10000 self.__arrival_variable: Any = None self.__departure_variable: Any = None self.__x_variable: Any = None self.__y_variable: Any = None self.__z_variable: Any = None self.__random_wait_variable: Any = None self.__capacity_required_variable: Any = None self.__rtn_allocation: Optional[Any] = None self.__allocatedConnections: int = 0 # time self.__clock: float = 0.0 self.__time_duration: Optional[float] = None self.__upper_random_wait: Optional[float] = None self.__lower_random_wait: Optional[float] = None self.__upper_capacity_required: Optional[float] = None self.__lower_capacity_required: Optional[float] = None self.__users_by_vlc: List[int] = [] self.__users_by_rf: List[int] = [] self.default_values()
[docs] def default_values(self): """Set default simulation parameters. Initializes all simulation parameters to sensible default values. These can be overridden before calling `init()` if different values are needed. Default Values: - lambdaS = 3 (arrival rate: 3 connections/second) - mu = 10 (service rate: mean time = 0.1 seconds) - seedArrive = 12345 (arrival process seed) - seedDeparture = 1234 (departure process seed) - seedX = 1235 (X position seed) - seedY = 1245 (Y position seed) - seedZ = 1345 (Z position seed) - seedRandomWait = 1345 (random wait seed) - seedCapacityRequired = 1345 (capacity requirement seed) - goalConnections = 10000 (simulation stopping criterion) - lower_random_wait = 5 (minimum wait time in seconds) - upper_random_wait = 15 (maximum wait time in seconds) - lower_capacity_required = 1e5 (minimum capacity: 100 Kbps) - upper_capacity_required = 5e5 (maximum capacity: 500 Kbps) - allocator = Controller.default_alloc (default allocation algorithm) Note: This method is called automatically by `__init__()`. """ self.__initReady = False self.__lambdaS = 3 self.__mu = 10 self.__seedArrive = 12345 self.__seedDeparture = 1234 self.__seedX = 1235 self.__seedY = 1245 self.__seedZ = 1345 self.__seedRandomWait = 1345 self.__seedCapacityRequired = 1345 self.__numberOfConnections = 0 self.__goalConnections: int = 10000 self.__lower_random_wait = 5 self.__upper_random_wait = 15 self.__lower_capacity_required = 1e5 self.__upper_capacity_required = 5e5 self.__allocatedConnections = 0 self.__controller.allocator = Controller.default_alloc
@property def upper_capacity_required(self) -> Optional[float]: """Get the upper bound for required connection capacity. Returns: float or None: Maximum capacity requirement in bps, or None if not set """ return self.__upper_capacity_required @upper_capacity_required.setter def upper_capacity_required(self, value: float) -> None: """Set the upper bound for required connection capacity. Args: value: Maximum capacity requirement in bits per second (bps) """ self.__upper_capacity_required = value @property def lower_capacity_required(self) -> Optional[float]: """Get the lower bound for required connection capacity. Returns: float or None: Minimum capacity requirement in bps, or None if not set """ return self.__lower_capacity_required @lower_capacity_required.setter def lower_capacity_required(self, value: float) -> None: """Set the lower bound for required connection capacity. Args: value: Minimum capacity requirement in bits per second (bps) """ self.__lower_capacity_required = value @property def seedCapacityRequired(self) -> Optional[int]: """Get the random seed for capacity requirement generation. Returns: int or None: Random number generator seed for capacity distribution """ return self.__seedCapacityRequired @seedCapacityRequired.setter def seedCapacityRequired(self, value: int): """Set the random seed for capacity requirement generation. Args: value: Random number generator seed (integer) """ self.__seedCapacityRequired = value @property def upper_random_wait(self) -> Optional[float]: """Get the upper bound for random wait time. When a connection cannot be allocated, it waits a random time uniformly distributed between [lower_random_wait, upper_random_wait] before retrying. Returns: float or None: Maximum wait time in seconds, or None if not set """ return self.__upper_random_wait @upper_random_wait.setter def upper_random_wait(self, value: float) -> None: """Set the upper bound for random wait time. Args: value: Maximum wait time in seconds """ self.__upper_random_wait = value @property def lower_random_wait(self) -> Optional[float]: """Get the lower bound for random wait time. When a connection cannot be allocated, it waits a random time uniformly distributed between [lower_random_wait, upper_random_wait] before retrying. Returns: float or None: Minimum wait time in seconds, or None if not set """ return self.__lower_random_wait @lower_random_wait.setter def lower_random_wait(self, value: float) -> None: """Set the lower bound for random wait time. Args: value: Minimum wait time in seconds """ self.__lower_random_wait = value @property def lambdaS(self) -> Optional[float]: """Get the Poisson arrival rate parameter. Lambda (λ) controls the arrival rate of connections. With exponential inter-arrival times, the mean time between arrivals is 1/λ seconds. Returns: float or None: Arrival rate in connections per second, or None if not set Example:: sim.lambdaS = 3.0 # Average of 3 arrivals per second # Mean inter-arrival time = 1/3 = 0.333 seconds """ return self.__lambdaS @lambdaS.setter def lambdaS(self, lambdaS: float) -> None: """Set the Poisson arrival rate parameter. Args: lambdaS: Arrival rate in connections per second (must be positive) Raises: Warning: If called after `init()` has been invoked """ if self.__initReady: print( "You can not set mu parameter AFTER calling init simulator " "method." ) return self.__lambdaS = lambdaS @property def mu(self) -> Optional[float]: """Get the exponential service rate parameter. Mu (μ) controls the service time distribution. With exponential service times, the mean service time is 1/μ seconds. Returns: float or None: Service rate in 1/seconds, or None if not set Example:: sim.mu = 10.0 # Mean service time = 1/10 = 0.1 seconds """ return self.__mu @mu.setter def mu(self, mu: float) -> None: """Set the exponential service rate parameter. Args: mu: Service rate in 1/seconds (must be positive) Raises: Warning: If called after `init()` has been invoked """ if self.__initReady: print( "You can not set mu parameter AFTER calling init simulator " "method." ) return self.__mu = mu @property def seedX(self) -> Optional[int]: """Get the random seed for X-coordinate generation. Returns: int or None: Random number generator seed for receiver X positions """ return self.__seedX @seedX.setter def seedX(self, seedX: int) -> None: """Set the random seed for X-coordinate generation. Args: seedX: Random number generator seed (integer) Raises: Warning: If called after `init()` has been invoked """ if self.__initReady: print( "You can not set mu parameter AFTER calling init simulator " "method." ) return self.__seedX = seedX @property def seedY(self) -> Optional[int]: """Get the random seed for Y-coordinate generation. Returns: int or None: Random number generator seed for receiver Y positions """ return self.__seedY @seedY.setter def seedY(self, seedY: int) -> None: """Set the random seed for Y-coordinate generation. Args: seedY: Random number generator seed (integer) Raises: Warning: If called after `init()` has been invoked """ if self.__initReady: print( "You can not set mu parameter AFTER calling init simulator " "method." ) return self.__seedY = seedY @property def seedZ(self) -> Optional[int]: """Get the random seed for Z-coordinate generation. Returns: int or None: Random number generator seed for receiver Z positions (height) """ return self.__seedZ @seedZ.setter def seedZ(self, seedZ: int) -> None: """Set the random seed for Z-coordinate generation. Args: seedZ: Random number generator seed (integer) Raises: Warning: If called after `init()` has been invoked """ if self.__initReady: print( "You can not set mu parameter AFTER calling init simulator " "method." ) return self.__seedZ = seedZ @property def seedRandomWait(self) -> Optional[int]: """Get the random seed for random wait time generation. Returns: int or None: Random number generator seed for wait time distribution """ return self.__seedRandomWait @seedRandomWait.setter def seedRandomWait(self, seedRandomWait: int) -> None: """Set the random seed for random wait time generation. Args: seedRandomWait: Random number generator seed (integer) Raises: Warning: If called after `init()` has been invoked """ if self.__initReady: print( "You can not set mu parameter AFTER calling init simulator " "method." ) return self.__seedRandomWait = seedRandomWait @property def goalConnections(self) -> Optional[int]: """Get the target number of connections for simulation termination. The simulation continues until this many connections have been generated. This serves as the stopping criterion for the simulation. Returns: int or None: Target connection count, or None if not set """ return self.__goalConnections @goalConnections.setter def goalConnections(self, goalConnections: int) -> None: """Set the target number of connections for simulation termination. Args: goalConnections: Target connection count (must be positive integer) Raises: Warning: If called after `init()` has been invoked """ if self.__initReady: print( "You can not set mu parameter AFTER calling init simulator " "method." ) return self.__goalConnections = goalConnections
[docs] def print_initial_info(self): """Print scenario configuration and simulation header. Displays initial scenario information including dimensions, number and positions of VLeds and RFs, and prepares the formatted table header for simulation event logging. Output includes: - Scenario dimensions (X x Y x Z) - Number of VLeds with ID and position table - Number of RFs with ID and position table - Event logging table header Note: This method is called automatically by `run()` before starting simulation. """ print( "Scenario:\t %s x %s x %s" % ( self.__controller.scenario.length, self.__controller.scenario.width, self.__controller.scenario.height, ) ) print(f"Number of VLeds: {self.__controller.scenario.numberOfVLeds}") print("Positions:") print(" ID | X | Y | Z |") print("-" * 47) for vled in self.__controller.scenario.vleds: print( "{:>10} | {:8.4f} | {:8.4f} | {:8.4f} |".format( vled.ID, vled.x, vled.y, vled.z ) ) print() print(f"Number of RFs: {self.__controller.scenario.numberOfRFs}") print("Positions:") print(" ID | X | Y | Z |") print("-" * 47) for rf in self.__controller.scenario.rfs: print( "{:>10} | {:8.4f} | {:8.4f} | {:8.4f} |".format( rf.ID, rf.x, rf.y, rf.z ) ) print() print("=" * 146) print("| Time ", end="") print("| Event ", end="") print("| Receiver ", end="") print("| X ", end="") print("| Y ", end="") print("| Z ", end="") print("| Access Point (A.C.) ", end="") print("| Goal time ", end="") print("| Elapsed time ", end="") print("| SNR ", end="") print("| Req. Cap. |") print("=" * 146)
[docs] def print_row(self, event: Event) -> None: """Print a formatted row for each simulation event. Displays detailed information about the current event in tabular format, including event type, connection details, receiver position, assigned AP, timing information, SNR, and capacity requirements. Args: event: The Event object to print information for Output Columns: - Time: Simulation clock time - Event: Event type (ARRIVE, PAUSE, RESUME, DEPARTURE, RETRYING) - Receiver: Connection ID - X, Y, Z: Receiver position coordinates - Access Point: Assigned AP (VLed/RF) with active connection count - Goal time: Target transmission time - Elapsed time: Actual time spent transmitting - SNR: Signal-to-Noise Ratio - Req. Cap.: Required capacity in bps Note: Called automatically by the event loop for each processed event. """ text = "" text = "{:9.4f}".format(event.time) + "|" if event.type == Event.event.ARRIVE: text += " ARRIVE |" elif event.type == Event.event.RESUME: text += " RESUME |" elif event.type == Event.event.PAUSE: text += " PAUSE |" elif event.type == Event.event.DEPARTURE: text += " DEPARTURE |" elif event.type == Event.event.NEXT_CONNECTION_TRY: text += " RETRYING |" text += "{:>10}".format(event.id_connection) + " |" if event.connection: text += "{:8.4f}".format(event.connection.receiver.x) + " |" text += "{:8.4f}".format(event.connection.receiver.y) + " |" text += "{:8.4f}".format(event.connection.receiver.z) + " |" else: text += " N/A | N/A | N/A |" if event.connection and isinstance(event.connection.AP, VLed): text += ( " VLed: {:>5}".format(event.connection.AP.ID) + " ({:^5})".format( self.__controller.numberOfActiveConnections(event.connection.AP) ) + " |" ) elif event.connection and isinstance(event.connection.AP, RF): text += ( " RF: {:>5}".format(event.connection.AP.ID) + " ({:^5})".format( self.__controller.numberOfActiveConnections(event.connection.AP) ) + " |" ) else: text += " NOT_SELECTED |" if event.connection and event.connection.allocated: text += "{:10.4f}".format(event.connection.receiver.goalTime) + " |" else: text += " NOT_ALLOC |" if event.connection: text += "{:12.4f}".format(event.connection.receiver.timeActive) + " |" else: text += " N/A |" # if event.connection != None: # print( # event.time, # event.type, # event.id_connection, # event.connection.receiver.goalTime, # event.connection.receiver.timeActive, # ) # else: # print( # event.time, # event.type, # event.id_connection, # ) if event.connection: text += "{:10.2e}".format(event.connection.snr) + " |" text += "{:10.2e}".format(event.connection.capacityRequired) + " |" else: text += " N/A | N/A |" print(text)
[docs] def event_routine(self): """Process the next scheduled event from the Future Event List. This is the core simulation routine that selects and executes the appropriate handler for each event type. It updates the simulation clock, generates new events, and manages the event list chronologically. Event Handling: - **ARRIVE**: Generate next arrival, create connection, attempt allocation - **NEXT_CONNECTION_TRY**: Retry allocation after random wait - **PAUSE**: Temporarily pause connection transmission - **RESUME**: Resume connection transmission or transition to departure - **DEPARTURE**: Complete connection and deallocate resources The method maintains the Future Event List (FEL) in chronological order and ensures proper state transitions for all connections. Returns: Any: Allocation routine return value (typically None) Note: - Called repeatedly by `run()` until simulation termination - Updates `self.__clock` to current event time - Manages connection lifecycle state transitions """ self.__current_event = self.__events[0] self.__rtn_allocation = None self.__clock = self.__current_event.time # print() # print(self.__controller.activeConnections[0]) # print(self.__controller.activeConnections[1]) # print(self.__controller.activeConnections[2]) # print(self.__controller.activeConnections[3]) # print(self.__controller.activeConnections[4]) if self.__current_event.type == Event.event.ARRIVE: next_event_time = self.__clock + self.__arrival_variable.exponential( self.__lambdaS ) for pos in range(len(self.__events) - 1, -1, -1): if self.__events[pos].time < next_event_time: self.__events.insert( pos + 1, Event( Event.event.ARRIVE, next_event_time, self.__numberOfConnections, ), ) self.__numberOfConnections += 1 break self.__x = self.__x_variable.uniform( low=self.__controller.scenario.start_x, high=self.__controller.scenario.end_x, ) self.__y = self.__y_variable.uniform( low=self.__controller.scenario.start_y, high=self.__controller.scenario.end_y, ) self.__z = self.__z_variable.uniform( low=0, high=self.__controller.scenario.height, ) receiver = Receiver(self.__x, self.__y, self.__z, 1e-4, 1.0, 1.5, 70.0) connection = Connection( self.__current_event.id_connection, receiver, self.__clock ) connection.capacityRequired = self.__capacity_required_variable.uniform( self.__lower_capacity_required, self.__upper_capacity_required ) # connection.receiver.goalTime = connection.goalTime next_status, time, connection = self.__controller.assignConnection( connection, self.__clock ) if ( next_status == Controller.nextStatus.RESUME or next_status == Controller.nextStatus.PAUSE ): if type(connection.AP) == VLed: # Find the index of this VLed in the scenario for idx, vled in enumerate(self.__controller.scenario.vleds): if vled is connection.AP: self.__users_by_vlc[idx] += 1 break elif type(connection.AP) == RF: # Find the index of this RF in the scenario for idx, rf in enumerate(self.__controller.scenario.rfs): if rf is connection.AP: self.__users_by_rf[idx] += 1 break self.__current_event.connection = connection if next_status == Controller.nextStatus.RESUME: for pos in range(len(self.__events) - 1, -1, -1): if self.__events[pos].time <= time: e = Event( Event.event.RESUME, time, connection.id, ) e.connection = connection self.__events.insert(pos + 1, e) break self.__allocatedConnections += 1 elif next_status == Controller.nextStatus.RND_WAIT: self.__current_event.connection = connection next_event_time = self.__clock + self.__random_wait_variable.uniform( low=self.__lower_random_wait, high=self.upper_random_wait, ) next_event = Event( Event.event.NEXT_CONNECTION_TRY, next_event_time, connection.id, ) next_event.connection = connection connection.allocated = False for pos in range(len(self.__events) - 1, -1, -1): if self.__events[pos].time < next_event_time: self.__events.insert(pos + 1, next_event) break elif self.__current_event.type == Event.event.NEXT_CONNECTION_TRY: if not self.__current_event.connection: return next_status, time, connection = self.__controller.assignConnection( self.__current_event.connection, self.__clock ) if ( next_status == Controller.nextStatus.RESUME or next_status == Controller.nextStatus.PAUSE ): connection.receiver.goalTime = self.__departure_variable.exponential( self.__mu ) connection.allocated = True if next_status == Controller.nextStatus.RESUME: for pos in range(len(self.__events) - 1, -1, -1): if self.__events[pos].time <= time: e = Event( Event.event.RESUME, time, connection.id, ) e.connection = connection self.__events.insert(pos + 1, e) break self.__allocatedConnections += 1 elif next_status == Controller.nextStatus.RND_WAIT: next_event_time = self.__clock + self.__random_wait_variable.uniform( low=self.__lower_random_wait, high=self.upper_random_wait, ) next_event = Event( Event.event.NEXT_CONNECTION_TRY, next_event_time, connection.id, ) next_event.connection = connection for pos in range(len(self.__events) - 1, -1, -1): if self.__events[pos].time < next_event_time: self.__events.insert(pos + 1, next_event) break elif self.__current_event.type == Event.event.PAUSE: if not self.__current_event.connection: return next_status, time, connection = self.__controller.pauseConnection( self.__current_event.connection, self.__clock ) for pos in range(len(self.__events) - 1, -1, -1): if self.__events[pos].time <= time: e = Event( Event.event.RESUME, time, connection.id, ) e.connection = connection self.__events.insert(pos + 1, e) break elif self.__current_event.type == Event.event.RESUME: if not self.__current_event.connection: return next_status, time, connection = self.__controller.resumeConnection( self.__current_event.connection, self.__clock ) for pos in range(len(self.__events) - 1, -1, -1): if self.__events[pos].time <= time: e = None if next_status == Controller.nextStatus.PAUSE: e = Event( Event.event.PAUSE, time, connection.id, ) else: e = Event( Event.event.DEPARTURE, time, connection.id, ) e.connection = connection self.__events.insert(pos + 1, e) break elif self.__current_event.type == Event.event.DEPARTURE: if not self.__current_event.connection: return next_status, time, connection = self.__controller.unassignConnection( self.__current_event.connection, self.__clock ) # if next_status == Controller.nextStatus.RESUME: # for pos in range(len(self.__events) - 1, -1, -1): # if self.__events[pos].time <= time: # e = Event( # Event.event.RESUME, # time, # connection.id, # ) # e.connection = connection # self.__events.insert(pos + 1, e) # break self.__events.pop(0) return self.__rtn_allocation
[docs] def init(self): """Initialize the simulation engine and prepare for execution. This method must be called after all configuration parameters have been set and before calling `run()`. It initializes: Initialization Steps: 1. Set initReady flag to prevent further parameter changes 2. Initialize random number generators with configured seeds 3. Create first arrival event 4. Initialize controller and AP structures 5. Set up connection tracking lists Raises: ValueError: If scenario has no VLeds or RFs configured Example:: sim = Simulator(10.0, 10.0, 3.0, 20, 0.8) sim.scenario.addVLed(vled1) sim.lambdaS = 3.0 sim.mu = 10.0 sim.init() # Initialize before running sim.run() Note: After calling `init()`, simulation parameters cannot be modified. """ self.__initReady = True self.__clock = 0 # Reset state variables to allow reinitialization self.__events = [] self.__numberOfConnections = 0 self.__allocatedConnections = 0 self.__users_by_vlc = [] self.__users_by_rf = [] self.__current_event = None self.__time_duration = None # Initialize random number generators self.__arrival_variable = np.random.default_rng(self.__seedArrive) self.__departure_variable = np.random.default_rng(self.__seedDeparture) self.__x_variable = np.random.default_rng(self.__seedX) self.__y_variable = np.random.default_rng(self.__seedY) self.__z_variable = np.random.default_rng(self.__seedZ) self.__random_wait_variable = np.random.default_rng(self.__seedZ) self.__capacity_required_variable = np.random.default_rng(self.__seedZ) # Start execution time measurement self.__start_time = time.perf_counter() # Create first arrival event self.__events.append( Event( Event.event.ARRIVE, self.__arrival_variable.exponential(self.__lambdaS), self.__numberOfConnections, ) ) self.__numberOfConnections += 1 # Initialize controller self.__controller.init() if ( self.__controller.scenario.numberOfVLeds == 0 and self.__controller.scenario.numberOfRFs == 0 ): raise ValueError("The scenario does not have any Vleds or RFs") # Initialize user counters for each AP for _ in range(self.__controller.scenario.numberOfVLeds): self.__users_by_vlc.append(0) for _ in range(self.__controller.scenario.numberOfRFs): self.__users_by_rf.append(0) return
[docs] def run(self): """Execute the simulation until the goal connection count is reached. Runs the main simulation loop, processing events chronologically from the Future Event List (FEL) until the number of generated connections reaches or exceeds `goalConnections`. The simulation workflow: 1. Print initial scenario information and table header 2. Process events sequentially from FEL 3. Print each event's details 4. Continue until goalConnections reached 5. Display aggregated metrics Output: - Prints formatted table of all simulation events - Shows connection arrivals, allocations, transmissions, departures - Displays final aggregated metrics (AP utilization) Raises: RuntimeError: If `init()` has not been called first Example:: sim.init() sim.run() # Executes until goalConnections reached Note: Simulation time advances based on event timestamps, not real time. """ self.print_initial_info() while self.__numberOfConnections <= self.__goalConnections: # for i in range(self.__goalConnections): self.event_routine() if self.__current_event: self.print_row(self.__current_event) # Calculate execution time end_time = time.perf_counter() self.__time_duration = end_time - self.__start_time self.aggregated_metrics()
[docs] def time_duration(self) -> float: """Get the total execution time of the simulation. Returns: float: Execution time in seconds (0.0 if not available) Note: This measures the real wall-clock time taken to run the simulation, not the simulated time. The measurement starts when init() is called and ends when run() completes. """ return self.__time_duration if self.__time_duration is not None else 0.0
[docs] def get_Blocking_Probability(self) -> float: """Calculate the connection blocking probability. Blocking probability is the fraction of connections that were NOT successfully allocated. It represents the percentage of connection requests that were denied due to insufficient resources. Returns: float: Blocking probability (0.0 to 1.0, rounded to 2 decimals) Formula:: Blocking Probability = 1 - (allocated_connections / total_connections) Example:: sim.run() blocking = sim.get_Blocking_Probability() print(f"Blocking rate: {blocking * 100:.1f}%") # e.g., "Blocking rate: 5.2%" """ blocking = round( 1 - self.__allocatedConnections / self.__numberOfConnections, 2 ) return blocking
[docs] def set_allocation_algorithm( self, alloc_alg: Callable[ [Receiver, Connection, Any, Controller], tuple[Any, Connection] ], ) -> None: """Set a custom connection allocation algorithm. Replaces the default allocation algorithm with a user-defined function. The custom allocator must follow the required signature and return format. Args: alloc_alg: Allocation algorithm function with signature:: def custom_alloc( receiver: Receiver, connection: Connection, scenario: Scenario, controller: Controller ) -> tuple[Controller.status, Connection]: # Allocation logic here return Controller.status.ALLOCATED, connection Example:: def my_allocator(receiver, connection, scenario, controller): # Custom allocation logic vleds = scenario.vleds best_vled = max(vleds, key=lambda v: scenario.snrVled(receiver, v)) connection.AP = best_vled return Controller.status.ALLOCATED, connection sim.set_allocation_algorithm(my_allocator) Note: Must be called before `init()` for the algorithm to take effect. """ self.__controller.allocator = alloc_alg
@property def scenario(self): """Get the simulation scenario object. Provides access to the underlying Scenario object which contains all physical infrastructure (VLeds, RFs, room dimensions, etc.). Returns: Scenario: The simulation scenario with all access points and configuration Example:: # Add VLeds to scenario vled = VLed(5.0, 5.0, 3.0, 2, 2, 20, 60) sim.scenario.addVLed(vled) # Query scenario properties print(f"Room: {sim.scenario.length} x {sim.scenario.width} m") """ return self.__controller.scenario
[docs] def aggregated_metrics(self): """Display aggregated simulation metrics. Prints the total number of connections served by each access point (both VLeds and RFs) throughout the simulation. Provides insight into AP utilization and load distribution. Output: - List of each VLed with total connections served - List of each RF with total connections served Example Output:: Number of users connected to each VLed VLed 0: 342 VLed 1: 289 Number of users connected to each RF RF 0: 156 Note: Called automatically at the end of `run()`. """ print("Number of users connected to each VLed") for i in range(self.__controller.scenario.numberOfVLeds): vled_id = self.__controller.scenario.vleds[i].ID if vled_id is not None: print(f"VLed {vled_id}: {self.__users_by_vlc[i]}") else: print(f"VLed {i}: {self.__users_by_vlc[i]}") print("Number of users connected to each RF") for i in range(self.__controller.scenario.numberOfRFs): rf_id = self.__controller.scenario.rfs[i].ID if rf_id is not None: print(f"RF {rf_id}: {self.__users_by_rf[i]}") else: print(f"RF {i}: {self.__users_by_rf[i]}")