"""Simulator Module for VLC Event-Driven Simulation.
This module provides the simulation engine for VLC (Visible Light Communication) systems.
It implements an event-driven discrete-event simulation (DES) with support for:
Classes:
Event: Represents simulation events (arrivals, departures, pauses, resumes)
Simulator: Main simulation engine managing event scheduling and execution
The Simulator module handles:
- Event scheduling and management using Future Event List (FEL)
- Random variable generation for arrivals, departures, and positions
- Connection lifecycle tracking (arrive → allocate → transmit → depart)
- Statistical data collection (blocking probability, AP utilization)
- Scenario initialization and execution control
Example:
Basic simulation setup::
from vlcsim import Simulator, VLed, RF
# Create simulator with 10x10x3m room
sim = Simulator(x=10.0, y=10.0, z=3.0, nGrids=20, rho=0.8)
# Add access points
vled = VLed(x=5.0, y=5.0, z=3.0, nLedsX=2, nLedsY=2, ledPower=20, theta=60)
sim.scenario.addVLed(vled)
# Configure simulation parameters
sim.lambdaS = 3.0 # Arrival rate (connections/second)
sim.mu = 10.0 # Service rate (1/seconds)
sim.goalConnections = 1000
# Run simulation
sim.init()
sim.run()
# Get results
print(f"Blocking probability: {sim.get_Blocking_Probability()}")
Note:
- Must call `init()` before `run()`
- Uses exponential inter-arrival times (Poisson process)
- Supports both VLC (VLed) and RF access points
"""
from .controller import *
from .scene import *
import numpy as np
import time
from enum import Enum
from typing import Optional, List, Any, Callable
[docs]
class Event:
"""Represents a simulation event in the discrete-event simulation.
The Event class encapsulates all information about a simulation event including
timing, type, and associated connection. Events are stored in a Future Event List
(FEL) and processed chronologically by the Simulator.
Attributes:
event (Enum): Event type enumeration with 5 possible values
Event Types:
- ARRIVE: New connection arrives to scenario
- DEPARTURE: Connection completes transmission and leaves
- PAUSE: Connection pauses transmission (TDM frame/slice boundary)
- RESUME: Connection resumes transmission after pause
- NEXT_CONNECTION_TRY: Connection retries allocation after waiting
Example::
# Create arrival event
event = Event(type=Event.event.ARRIVE, time=5.2, id_connection=42)
event.connection = connection_object
# Check event type
if event.type == Event.event.ARRIVE:
print(f"Connection {event.id_connection} arrived at {event.time}")
"""
event = Enum("event", "ARRIVE DEPARTURE PAUSE RESUME NEXT_CONNECTION_TRY")
"""Event type enumeration.Event type enumeration.
·
Five event types for simulation lifecycle:
- ARRIVE: Connection arrives to scenario (triggers allocation)
- DEPARTURE: Connection completes transmission and leaves
- PAUSE: Connection temporarily pauses (TDM frame/slice boundary)
- RESUME: Connection resumes transmission after pause
- NEXT_CONNECTION_TRY: Connection retries allocation after random wait
"""
[docs]
def __init__(
self, type: Optional[event] = None, time: float = -1, id_connection: int = -1
) -> None:
self.__time: float = time
self.__id_connection: int = id_connection
if type is None:
self.__type: Event.event = Event.event.ARRIVE
else:
self.__type = type
self.__connection: Optional[Connection] = None
@property
def type(self) -> event:
"""Get the event type.
Returns:
event: One of the five event types (ARRIVE, DEPARTURE, PAUSE, RESUME, NEXT_CONNECTION_TRY)
"""
return self.__type
@property
def time(self) -> float:
"""Get the simulation time when this event occurs.
Returns:
float: Event occurrence time in seconds
"""
return self.__time
@property
def id_connection(self) -> int:
"""Get the connection ID associated with this event.
Returns:
int: Unique connection identifier
"""
return self.__id_connection
@property
def connection(self) -> Optional[Connection]:
"""Get the Connection object associated with this event.
Returns:
Connection or None: Connection object if set, None otherwise
"""
return self.__connection
@connection.setter
def connection(self, value: Connection):
"""Set the Connection object for this event.
Args:
value: Connection object to associate with this event
"""
self.__connection = value
[docs]
class Simulator:
"""Event-driven discrete-event simulator for VLC systems.
The Simulator class implements a complete discrete-event simulation (DES) engine
for VLC communication systems. It manages the event scheduling, random variable
generation, connection lifecycle, and statistical data collection.
The simulation follows these key phases:
1. **Initialization**: Configure scenario, APs, and simulation parameters
2. **Event Scheduling**: Maintain chronologically ordered Future Event List (FEL)
3. **Event Processing**: Execute event routines (arrive, pause, resume, depart)
4. **Statistics Collection**: Track blocking probability, AP utilization, delays
Key Features:
- Poisson arrival process (exponential inter-arrival times)
- Exponential service times
- Uniform spatial distribution of receivers
- Customizable allocation algorithms
- Support for both VLC (VLed) and RF access points
Example:
Complete simulation workflow::
# Create simulator
sim = Simulator(x=10.0, y=10.0, z=3.0, nGrids=20, rho=0.8)
# Add access points
vled1 = VLed(2.5, 2.5, 3.0, 2, 2, 20, 60)
vled2 = VLed(7.5, 7.5, 3.0, 2, 2, 20, 60)
sim.scenario.addVLed(vled1)
sim.scenario.addVLed(vled2)
# Configure parameters
sim.lambdaS = 3.0 # 3 arrivals/second
sim.mu = 10.0 # mean service time = 0.1 seconds
sim.goalConnections = 10000
sim.lower_capacity_required = 1e6 # 1 Mbps
sim.upper_capacity_required = 10e6 # 10 Mbps
# Run simulation
sim.init()
sim.run()
# Get results
blocking = sim.get_Blocking_Probability()
print(f"Blocking Probability: {blocking}")
Note:
- Must call `init()` before `run()`
- Seeds control reproducibility of random variables
- Default values are set automatically if not specified
"""
[docs]
def __init__(self, x: float, y: float, z: float, nGrids: int, rho: float) -> None:
"""Initialize a Simulator instance.
Creates a new simulator with the specified scenario dimensions and
wall reflection parameters. Initializes all internal state variables
and sets default simulation parameters.
Args:
x: Room length in meters (X-axis dimension)
y: Room width in meters (Y-axis dimension)
z: Room height in meters (Z-axis dimension, ceiling height)
nGrids: Number of grid divisions per meter for wall reflection calculations
rho: Wall reflection coefficient (0.0 to 1.0, where 0=no reflection, 1=perfect reflection)
Example::
# Create simulator for 10x10x3m room with 0.8 reflection
sim = Simulator(x=10.0, y=10.0, z=3.0, nGrids=20, rho=0.8)
"""
self.__controller: Controller = Controller(x, y, z, nGrids, rho)
self.__events: List[Event] = []
self.__current_event: Optional[Event] = None
self.__initReady: Optional[bool] = None
self.__lambdaS: Optional[float] = None
self.__mu: Optional[float] = None
self.__seedArrive: Optional[int] = None
self.__seedDeparture: Optional[int] = None
self.__seedX: Optional[int] = None
self.__seedY: Optional[int] = None
self.__seedZ: Optional[int] = None
self.__seedRandomWait: Optional[int] = None
self.__seedCapacityRequired: Optional[int] = None
self.__numberOfConnections: int = 0
self.__goalConnections: int = 10000
self.__arrival_variable: Any = None
self.__departure_variable: Any = None
self.__x_variable: Any = None
self.__y_variable: Any = None
self.__z_variable: Any = None
self.__random_wait_variable: Any = None
self.__capacity_required_variable: Any = None
self.__rtn_allocation: Optional[Any] = None
self.__allocatedConnections: int = 0
# time
self.__clock: float = 0.0
self.__time_duration: Optional[float] = None
self.__upper_random_wait: Optional[float] = None
self.__lower_random_wait: Optional[float] = None
self.__upper_capacity_required: Optional[float] = None
self.__lower_capacity_required: Optional[float] = None
self.__users_by_vlc: List[int] = []
self.__users_by_rf: List[int] = []
self.default_values()
[docs]
def default_values(self):
"""Set default simulation parameters.
Initializes all simulation parameters to sensible default values. These
can be overridden before calling `init()` if different values are needed.
Default Values:
- lambdaS = 3 (arrival rate: 3 connections/second)
- mu = 10 (service rate: mean time = 0.1 seconds)
- seedArrive = 12345 (arrival process seed)
- seedDeparture = 1234 (departure process seed)
- seedX = 1235 (X position seed)
- seedY = 1245 (Y position seed)
- seedZ = 1345 (Z position seed)
- seedRandomWait = 1345 (random wait seed)
- seedCapacityRequired = 1345 (capacity requirement seed)
- goalConnections = 10000 (simulation stopping criterion)
- lower_random_wait = 5 (minimum wait time in seconds)
- upper_random_wait = 15 (maximum wait time in seconds)
- lower_capacity_required = 1e5 (minimum capacity: 100 Kbps)
- upper_capacity_required = 5e5 (maximum capacity: 500 Kbps)
- allocator = Controller.default_alloc (default allocation algorithm)
Note:
This method is called automatically by `__init__()`.
"""
self.__initReady = False
self.__lambdaS = 3
self.__mu = 10
self.__seedArrive = 12345
self.__seedDeparture = 1234
self.__seedX = 1235
self.__seedY = 1245
self.__seedZ = 1345
self.__seedRandomWait = 1345
self.__seedCapacityRequired = 1345
self.__numberOfConnections = 0
self.__goalConnections: int = 10000
self.__lower_random_wait = 5
self.__upper_random_wait = 15
self.__lower_capacity_required = 1e5
self.__upper_capacity_required = 5e5
self.__allocatedConnections = 0
self.__controller.allocator = Controller.default_alloc
@property
def upper_capacity_required(self) -> Optional[float]:
"""Get the upper bound for required connection capacity.
Returns:
float or None: Maximum capacity requirement in bps, or None if not set
"""
return self.__upper_capacity_required
@upper_capacity_required.setter
def upper_capacity_required(self, value: float) -> None:
"""Set the upper bound for required connection capacity.
Args:
value: Maximum capacity requirement in bits per second (bps)
"""
self.__upper_capacity_required = value
@property
def lower_capacity_required(self) -> Optional[float]:
"""Get the lower bound for required connection capacity.
Returns:
float or None: Minimum capacity requirement in bps, or None if not set
"""
return self.__lower_capacity_required
@lower_capacity_required.setter
def lower_capacity_required(self, value: float) -> None:
"""Set the lower bound for required connection capacity.
Args:
value: Minimum capacity requirement in bits per second (bps)
"""
self.__lower_capacity_required = value
@property
def seedCapacityRequired(self) -> Optional[int]:
"""Get the random seed for capacity requirement generation.
Returns:
int or None: Random number generator seed for capacity distribution
"""
return self.__seedCapacityRequired
@seedCapacityRequired.setter
def seedCapacityRequired(self, value: int):
"""Set the random seed for capacity requirement generation.
Args:
value: Random number generator seed (integer)
"""
self.__seedCapacityRequired = value
@property
def upper_random_wait(self) -> Optional[float]:
"""Get the upper bound for random wait time.
When a connection cannot be allocated, it waits a random time uniformly
distributed between [lower_random_wait, upper_random_wait] before retrying.
Returns:
float or None: Maximum wait time in seconds, or None if not set
"""
return self.__upper_random_wait
@upper_random_wait.setter
def upper_random_wait(self, value: float) -> None:
"""Set the upper bound for random wait time.
Args:
value: Maximum wait time in seconds
"""
self.__upper_random_wait = value
@property
def lower_random_wait(self) -> Optional[float]:
"""Get the lower bound for random wait time.
When a connection cannot be allocated, it waits a random time uniformly
distributed between [lower_random_wait, upper_random_wait] before retrying.
Returns:
float or None: Minimum wait time in seconds, or None if not set
"""
return self.__lower_random_wait
@lower_random_wait.setter
def lower_random_wait(self, value: float) -> None:
"""Set the lower bound for random wait time.
Args:
value: Minimum wait time in seconds
"""
self.__lower_random_wait = value
@property
def lambdaS(self) -> Optional[float]:
"""Get the Poisson arrival rate parameter.
Lambda (λ) controls the arrival rate of connections. With exponential
inter-arrival times, the mean time between arrivals is 1/λ seconds.
Returns:
float or None: Arrival rate in connections per second, or None if not set
Example::
sim.lambdaS = 3.0 # Average of 3 arrivals per second
# Mean inter-arrival time = 1/3 = 0.333 seconds
"""
return self.__lambdaS
@lambdaS.setter
def lambdaS(self, lambdaS: float) -> None:
"""Set the Poisson arrival rate parameter.
Args:
lambdaS: Arrival rate in connections per second (must be positive)
Raises:
Warning: If called after `init()` has been invoked
"""
if self.__initReady:
print(
"You can not set mu parameter AFTER calling init simulator " "method."
)
return
self.__lambdaS = lambdaS
@property
def mu(self) -> Optional[float]:
"""Get the exponential service rate parameter.
Mu (μ) controls the service time distribution. With exponential service
times, the mean service time is 1/μ seconds.
Returns:
float or None: Service rate in 1/seconds, or None if not set
Example::
sim.mu = 10.0 # Mean service time = 1/10 = 0.1 seconds
"""
return self.__mu
@mu.setter
def mu(self, mu: float) -> None:
"""Set the exponential service rate parameter.
Args:
mu: Service rate in 1/seconds (must be positive)
Raises:
Warning: If called after `init()` has been invoked
"""
if self.__initReady:
print(
"You can not set mu parameter AFTER calling init simulator " "method."
)
return
self.__mu = mu
@property
def seedX(self) -> Optional[int]:
"""Get the random seed for X-coordinate generation.
Returns:
int or None: Random number generator seed for receiver X positions
"""
return self.__seedX
@seedX.setter
def seedX(self, seedX: int) -> None:
"""Set the random seed for X-coordinate generation.
Args:
seedX: Random number generator seed (integer)
Raises:
Warning: If called after `init()` has been invoked
"""
if self.__initReady:
print(
"You can not set mu parameter AFTER calling init simulator " "method."
)
return
self.__seedX = seedX
@property
def seedY(self) -> Optional[int]:
"""Get the random seed for Y-coordinate generation.
Returns:
int or None: Random number generator seed for receiver Y positions
"""
return self.__seedY
@seedY.setter
def seedY(self, seedY: int) -> None:
"""Set the random seed for Y-coordinate generation.
Args:
seedY: Random number generator seed (integer)
Raises:
Warning: If called after `init()` has been invoked
"""
if self.__initReady:
print(
"You can not set mu parameter AFTER calling init simulator " "method."
)
return
self.__seedY = seedY
@property
def seedZ(self) -> Optional[int]:
"""Get the random seed for Z-coordinate generation.
Returns:
int or None: Random number generator seed for receiver Z positions (height)
"""
return self.__seedZ
@seedZ.setter
def seedZ(self, seedZ: int) -> None:
"""Set the random seed for Z-coordinate generation.
Args:
seedZ: Random number generator seed (integer)
Raises:
Warning: If called after `init()` has been invoked
"""
if self.__initReady:
print(
"You can not set mu parameter AFTER calling init simulator " "method."
)
return
self.__seedZ = seedZ
@property
def seedRandomWait(self) -> Optional[int]:
"""Get the random seed for random wait time generation.
Returns:
int or None: Random number generator seed for wait time distribution
"""
return self.__seedRandomWait
@seedRandomWait.setter
def seedRandomWait(self, seedRandomWait: int) -> None:
"""Set the random seed for random wait time generation.
Args:
seedRandomWait: Random number generator seed (integer)
Raises:
Warning: If called after `init()` has been invoked
"""
if self.__initReady:
print(
"You can not set mu parameter AFTER calling init simulator " "method."
)
return
self.__seedRandomWait = seedRandomWait
@property
def goalConnections(self) -> Optional[int]:
"""Get the target number of connections for simulation termination.
The simulation continues until this many connections have been generated.
This serves as the stopping criterion for the simulation.
Returns:
int or None: Target connection count, or None if not set
"""
return self.__goalConnections
@goalConnections.setter
def goalConnections(self, goalConnections: int) -> None:
"""Set the target number of connections for simulation termination.
Args:
goalConnections: Target connection count (must be positive integer)
Raises:
Warning: If called after `init()` has been invoked
"""
if self.__initReady:
print(
"You can not set mu parameter AFTER calling init simulator " "method."
)
return
self.__goalConnections = goalConnections
[docs]
def print_initial_info(self):
"""Print scenario configuration and simulation header.
Displays initial scenario information including dimensions, number and
positions of VLeds and RFs, and prepares the formatted table header for
simulation event logging.
Output includes:
- Scenario dimensions (X x Y x Z)
- Number of VLeds with ID and position table
- Number of RFs with ID and position table
- Event logging table header
Note:
This method is called automatically by `run()` before starting simulation.
"""
print(
"Scenario:\t %s x %s x %s"
% (
self.__controller.scenario.length,
self.__controller.scenario.width,
self.__controller.scenario.height,
)
)
print(f"Number of VLeds: {self.__controller.scenario.numberOfVLeds}")
print("Positions:")
print(" ID | X | Y | Z |")
print("-" * 47)
for vled in self.__controller.scenario.vleds:
print(
"{:>10} | {:8.4f} | {:8.4f} | {:8.4f} |".format(
vled.ID, vled.x, vled.y, vled.z
)
)
print()
print(f"Number of RFs: {self.__controller.scenario.numberOfRFs}")
print("Positions:")
print(" ID | X | Y | Z |")
print("-" * 47)
for rf in self.__controller.scenario.rfs:
print(
"{:>10} | {:8.4f} | {:8.4f} | {:8.4f} |".format(
rf.ID, rf.x, rf.y, rf.z
)
)
print()
print("=" * 146)
print("| Time ", end="")
print("| Event ", end="")
print("| Receiver ", end="")
print("| X ", end="")
print("| Y ", end="")
print("| Z ", end="")
print("| Access Point (A.C.) ", end="")
print("| Goal time ", end="")
print("| Elapsed time ", end="")
print("| SNR ", end="")
print("| Req. Cap. |")
print("=" * 146)
[docs]
def print_row(self, event: Event) -> None:
"""Print a formatted row for each simulation event.
Displays detailed information about the current event in tabular format,
including event type, connection details, receiver position, assigned AP,
timing information, SNR, and capacity requirements.
Args:
event: The Event object to print information for
Output Columns:
- Time: Simulation clock time
- Event: Event type (ARRIVE, PAUSE, RESUME, DEPARTURE, RETRYING)
- Receiver: Connection ID
- X, Y, Z: Receiver position coordinates
- Access Point: Assigned AP (VLed/RF) with active connection count
- Goal time: Target transmission time
- Elapsed time: Actual time spent transmitting
- SNR: Signal-to-Noise Ratio
- Req. Cap.: Required capacity in bps
Note:
Called automatically by the event loop for each processed event.
"""
text = ""
text = "{:9.4f}".format(event.time) + "|"
if event.type == Event.event.ARRIVE:
text += " ARRIVE |"
elif event.type == Event.event.RESUME:
text += " RESUME |"
elif event.type == Event.event.PAUSE:
text += " PAUSE |"
elif event.type == Event.event.DEPARTURE:
text += " DEPARTURE |"
elif event.type == Event.event.NEXT_CONNECTION_TRY:
text += " RETRYING |"
text += "{:>10}".format(event.id_connection) + " |"
if event.connection:
text += "{:8.4f}".format(event.connection.receiver.x) + " |"
text += "{:8.4f}".format(event.connection.receiver.y) + " |"
text += "{:8.4f}".format(event.connection.receiver.z) + " |"
else:
text += " N/A | N/A | N/A |"
if event.connection and isinstance(event.connection.AP, VLed):
text += (
" VLed: {:>5}".format(event.connection.AP.ID)
+ " ({:^5})".format(
self.__controller.numberOfActiveConnections(event.connection.AP)
)
+ " |"
)
elif event.connection and isinstance(event.connection.AP, RF):
text += (
" RF: {:>5}".format(event.connection.AP.ID)
+ " ({:^5})".format(
self.__controller.numberOfActiveConnections(event.connection.AP)
)
+ " |"
)
else:
text += " NOT_SELECTED |"
if event.connection and event.connection.allocated:
text += "{:10.4f}".format(event.connection.receiver.goalTime) + " |"
else:
text += " NOT_ALLOC |"
if event.connection:
text += "{:12.4f}".format(event.connection.receiver.timeActive) + " |"
else:
text += " N/A |"
# if event.connection != None:
# print(
# event.time,
# event.type,
# event.id_connection,
# event.connection.receiver.goalTime,
# event.connection.receiver.timeActive,
# )
# else:
# print(
# event.time,
# event.type,
# event.id_connection,
# )
if event.connection:
text += "{:10.2e}".format(event.connection.snr) + " |"
text += "{:10.2e}".format(event.connection.capacityRequired) + " |"
else:
text += " N/A | N/A |"
print(text)
[docs]
def event_routine(self):
"""Process the next scheduled event from the Future Event List.
This is the core simulation routine that selects and executes the appropriate
handler for each event type. It updates the simulation clock, generates new
events, and manages the event list chronologically.
Event Handling:
- **ARRIVE**: Generate next arrival, create connection, attempt allocation
- **NEXT_CONNECTION_TRY**: Retry allocation after random wait
- **PAUSE**: Temporarily pause connection transmission
- **RESUME**: Resume connection transmission or transition to departure
- **DEPARTURE**: Complete connection and deallocate resources
The method maintains the Future Event List (FEL) in chronological order and
ensures proper state transitions for all connections.
Returns:
Any: Allocation routine return value (typically None)
Note:
- Called repeatedly by `run()` until simulation termination
- Updates `self.__clock` to current event time
- Manages connection lifecycle state transitions
"""
self.__current_event = self.__events[0]
self.__rtn_allocation = None
self.__clock = self.__current_event.time
# print()
# print(self.__controller.activeConnections[0])
# print(self.__controller.activeConnections[1])
# print(self.__controller.activeConnections[2])
# print(self.__controller.activeConnections[3])
# print(self.__controller.activeConnections[4])
if self.__current_event.type == Event.event.ARRIVE:
next_event_time = self.__clock + self.__arrival_variable.exponential(
self.__lambdaS
)
for pos in range(len(self.__events) - 1, -1, -1):
if self.__events[pos].time < next_event_time:
self.__events.insert(
pos + 1,
Event(
Event.event.ARRIVE,
next_event_time,
self.__numberOfConnections,
),
)
self.__numberOfConnections += 1
break
self.__x = self.__x_variable.uniform(
low=self.__controller.scenario.start_x,
high=self.__controller.scenario.end_x,
)
self.__y = self.__y_variable.uniform(
low=self.__controller.scenario.start_y,
high=self.__controller.scenario.end_y,
)
self.__z = self.__z_variable.uniform(
low=0,
high=self.__controller.scenario.height,
)
receiver = Receiver(self.__x, self.__y, self.__z, 1e-4, 1.0, 1.5, 70.0)
connection = Connection(
self.__current_event.id_connection, receiver, self.__clock
)
connection.capacityRequired = self.__capacity_required_variable.uniform(
self.__lower_capacity_required, self.__upper_capacity_required
)
# connection.receiver.goalTime = connection.goalTime
next_status, time, connection = self.__controller.assignConnection(
connection, self.__clock
)
if (
next_status == Controller.nextStatus.RESUME
or next_status == Controller.nextStatus.PAUSE
):
if type(connection.AP) == VLed:
# Find the index of this VLed in the scenario
for idx, vled in enumerate(self.__controller.scenario.vleds):
if vled is connection.AP:
self.__users_by_vlc[idx] += 1
break
elif type(connection.AP) == RF:
# Find the index of this RF in the scenario
for idx, rf in enumerate(self.__controller.scenario.rfs):
if rf is connection.AP:
self.__users_by_rf[idx] += 1
break
self.__current_event.connection = connection
if next_status == Controller.nextStatus.RESUME:
for pos in range(len(self.__events) - 1, -1, -1):
if self.__events[pos].time <= time:
e = Event(
Event.event.RESUME,
time,
connection.id,
)
e.connection = connection
self.__events.insert(pos + 1, e)
break
self.__allocatedConnections += 1
elif next_status == Controller.nextStatus.RND_WAIT:
self.__current_event.connection = connection
next_event_time = self.__clock + self.__random_wait_variable.uniform(
low=self.__lower_random_wait,
high=self.upper_random_wait,
)
next_event = Event(
Event.event.NEXT_CONNECTION_TRY,
next_event_time,
connection.id,
)
next_event.connection = connection
connection.allocated = False
for pos in range(len(self.__events) - 1, -1, -1):
if self.__events[pos].time < next_event_time:
self.__events.insert(pos + 1, next_event)
break
elif self.__current_event.type == Event.event.NEXT_CONNECTION_TRY:
if not self.__current_event.connection:
return
next_status, time, connection = self.__controller.assignConnection(
self.__current_event.connection, self.__clock
)
if (
next_status == Controller.nextStatus.RESUME
or next_status == Controller.nextStatus.PAUSE
):
connection.receiver.goalTime = self.__departure_variable.exponential(
self.__mu
)
connection.allocated = True
if next_status == Controller.nextStatus.RESUME:
for pos in range(len(self.__events) - 1, -1, -1):
if self.__events[pos].time <= time:
e = Event(
Event.event.RESUME,
time,
connection.id,
)
e.connection = connection
self.__events.insert(pos + 1, e)
break
self.__allocatedConnections += 1
elif next_status == Controller.nextStatus.RND_WAIT:
next_event_time = self.__clock + self.__random_wait_variable.uniform(
low=self.__lower_random_wait,
high=self.upper_random_wait,
)
next_event = Event(
Event.event.NEXT_CONNECTION_TRY,
next_event_time,
connection.id,
)
next_event.connection = connection
for pos in range(len(self.__events) - 1, -1, -1):
if self.__events[pos].time < next_event_time:
self.__events.insert(pos + 1, next_event)
break
elif self.__current_event.type == Event.event.PAUSE:
if not self.__current_event.connection:
return
next_status, time, connection = self.__controller.pauseConnection(
self.__current_event.connection, self.__clock
)
for pos in range(len(self.__events) - 1, -1, -1):
if self.__events[pos].time <= time:
e = Event(
Event.event.RESUME,
time,
connection.id,
)
e.connection = connection
self.__events.insert(pos + 1, e)
break
elif self.__current_event.type == Event.event.RESUME:
if not self.__current_event.connection:
return
next_status, time, connection = self.__controller.resumeConnection(
self.__current_event.connection, self.__clock
)
for pos in range(len(self.__events) - 1, -1, -1):
if self.__events[pos].time <= time:
e = None
if next_status == Controller.nextStatus.PAUSE:
e = Event(
Event.event.PAUSE,
time,
connection.id,
)
else:
e = Event(
Event.event.DEPARTURE,
time,
connection.id,
)
e.connection = connection
self.__events.insert(pos + 1, e)
break
elif self.__current_event.type == Event.event.DEPARTURE:
if not self.__current_event.connection:
return
next_status, time, connection = self.__controller.unassignConnection(
self.__current_event.connection, self.__clock
)
# if next_status == Controller.nextStatus.RESUME:
# for pos in range(len(self.__events) - 1, -1, -1):
# if self.__events[pos].time <= time:
# e = Event(
# Event.event.RESUME,
# time,
# connection.id,
# )
# e.connection = connection
# self.__events.insert(pos + 1, e)
# break
self.__events.pop(0)
return self.__rtn_allocation
[docs]
def init(self):
"""Initialize the simulation engine and prepare for execution.
This method must be called after all configuration parameters have been
set and before calling `run()`. It initializes:
Initialization Steps:
1. Set initReady flag to prevent further parameter changes 2. Initialize random number generators with configured seeds
3. Create first arrival event
4. Initialize controller and AP structures
5. Set up connection tracking lists
Raises:
ValueError: If scenario has no VLeds or RFs configured
Example::
sim = Simulator(10.0, 10.0, 3.0, 20, 0.8)
sim.scenario.addVLed(vled1)
sim.lambdaS = 3.0
sim.mu = 10.0
sim.init() # Initialize before running
sim.run()
Note:
After calling `init()`, simulation parameters cannot be modified.
"""
self.__initReady = True
self.__clock = 0
# Reset state variables to allow reinitialization
self.__events = []
self.__numberOfConnections = 0
self.__allocatedConnections = 0
self.__users_by_vlc = []
self.__users_by_rf = []
self.__current_event = None
self.__time_duration = None
# Initialize random number generators
self.__arrival_variable = np.random.default_rng(self.__seedArrive)
self.__departure_variable = np.random.default_rng(self.__seedDeparture)
self.__x_variable = np.random.default_rng(self.__seedX)
self.__y_variable = np.random.default_rng(self.__seedY)
self.__z_variable = np.random.default_rng(self.__seedZ)
self.__random_wait_variable = np.random.default_rng(self.__seedZ)
self.__capacity_required_variable = np.random.default_rng(self.__seedZ)
# Start execution time measurement
self.__start_time = time.perf_counter()
# Create first arrival event
self.__events.append(
Event(
Event.event.ARRIVE,
self.__arrival_variable.exponential(self.__lambdaS),
self.__numberOfConnections,
)
)
self.__numberOfConnections += 1
# Initialize controller
self.__controller.init()
if (
self.__controller.scenario.numberOfVLeds == 0
and self.__controller.scenario.numberOfRFs == 0
):
raise ValueError("The scenario does not have any Vleds or RFs")
# Initialize user counters for each AP
for _ in range(self.__controller.scenario.numberOfVLeds):
self.__users_by_vlc.append(0)
for _ in range(self.__controller.scenario.numberOfRFs):
self.__users_by_rf.append(0)
return
[docs]
def run(self):
"""Execute the simulation until the goal connection count is reached.
Runs the main simulation loop, processing events chronologically from the
Future Event List (FEL) until the number of generated connections reaches
or exceeds `goalConnections`.
The simulation workflow:
1. Print initial scenario information and table header
2. Process events sequentially from FEL
3. Print each event's details
4. Continue until goalConnections reached
5. Display aggregated metrics
Output:
- Prints formatted table of all simulation events
- Shows connection arrivals, allocations, transmissions, departures
- Displays final aggregated metrics (AP utilization)
Raises:
RuntimeError: If `init()` has not been called first
Example::
sim.init()
sim.run() # Executes until goalConnections reached
Note:
Simulation time advances based on event timestamps, not real time.
"""
self.print_initial_info()
while self.__numberOfConnections <= self.__goalConnections:
# for i in range(self.__goalConnections):
self.event_routine()
if self.__current_event:
self.print_row(self.__current_event)
# Calculate execution time
end_time = time.perf_counter()
self.__time_duration = end_time - self.__start_time
self.aggregated_metrics()
[docs]
def time_duration(self) -> float:
"""Get the total execution time of the simulation.
Returns:
float: Execution time in seconds (0.0 if not available)
Note:
This measures the real wall-clock time taken to run the simulation,
not the simulated time. The measurement starts when init() is called
and ends when run() completes.
"""
return self.__time_duration if self.__time_duration is not None else 0.0
[docs]
def get_Blocking_Probability(self) -> float:
"""Calculate the connection blocking probability.
Blocking probability is the fraction of connections that were NOT successfully
allocated. It represents the percentage of connection requests that were denied
due to insufficient resources.
Returns:
float: Blocking probability (0.0 to 1.0, rounded to 2 decimals)
Formula::
Blocking Probability = 1 - (allocated_connections / total_connections)
Example::
sim.run()
blocking = sim.get_Blocking_Probability()
print(f"Blocking rate: {blocking * 100:.1f}%") # e.g., "Blocking rate: 5.2%"
"""
blocking = round(
1 - self.__allocatedConnections / self.__numberOfConnections, 2
)
return blocking
[docs]
def set_allocation_algorithm(
self,
alloc_alg: Callable[
[Receiver, Connection, Any, Controller], tuple[Any, Connection]
],
) -> None:
"""Set a custom connection allocation algorithm.
Replaces the default allocation algorithm with a user-defined function.
The custom allocator must follow the required signature and return format.
Args:
alloc_alg: Allocation algorithm function with signature::
def custom_alloc(
receiver: Receiver,
connection: Connection,
scenario: Scenario,
controller: Controller
) -> tuple[Controller.status, Connection]:
# Allocation logic here
return Controller.status.ALLOCATED, connection
Example::
def my_allocator(receiver, connection, scenario, controller):
# Custom allocation logic
vleds = scenario.vleds
best_vled = max(vleds, key=lambda v: scenario.snrVled(receiver, v))
connection.AP = best_vled
return Controller.status.ALLOCATED, connection
sim.set_allocation_algorithm(my_allocator)
Note:
Must be called before `init()` for the algorithm to take effect.
"""
self.__controller.allocator = alloc_alg
@property
def scenario(self):
"""Get the simulation scenario object.
Provides access to the underlying Scenario object which contains all
physical infrastructure (VLeds, RFs, room dimensions, etc.).
Returns:
Scenario: The simulation scenario with all access points and configuration
Example::
# Add VLeds to scenario
vled = VLed(5.0, 5.0, 3.0, 2, 2, 20, 60)
sim.scenario.addVLed(vled)
# Query scenario properties
print(f"Room: {sim.scenario.length} x {sim.scenario.width} m")
"""
return self.__controller.scenario
[docs]
def aggregated_metrics(self):
"""Display aggregated simulation metrics.
Prints the total number of connections served by each access point
(both VLeds and RFs) throughout the simulation. Provides insight into
AP utilization and load distribution.
Output:
- List of each VLed with total connections served
- List of each RF with total connections served
Example Output::
Number of users connected to each VLed
VLed 0: 342
VLed 1: 289
Number of users connected to each RF
RF 0: 156
Note:
Called automatically at the end of `run()`.
"""
print("Number of users connected to each VLed")
for i in range(self.__controller.scenario.numberOfVLeds):
vled_id = self.__controller.scenario.vleds[i].ID
if vled_id is not None:
print(f"VLed {vled_id}: {self.__users_by_vlc[i]}")
else:
print(f"VLed {i}: {self.__users_by_vlc[i]}")
print("Number of users connected to each RF")
for i in range(self.__controller.scenario.numberOfRFs):
rf_id = self.__controller.scenario.rfs[i].ID
if rf_id is not None:
print(f"RF {rf_id}: {self.__users_by_rf[i]}")
else:
print(f"RF {i}: {self.__users_by_rf[i]}")