Overview
The simulation engine is implemented in the DiscreteEventSimulation class located at backend/src/simulation/domain/simulation.py. It orchestrates all simulation activities using a discrete event simulation (DES) approach.
Core Components
Event Queue
Implemented using Python’s heapq min-heap:
import heapq
self .event_queue: List[SimulationEvent] = []
# Schedule an event
heapq.heappush( self .event_queue, event)
# Get next event
current_event = heapq.heappop( self .event_queue)
Why heapq?
O(log n) insertion
O(log n) extraction of minimum
Automatically maintains chronological order
Simulation Clock
The clock advances by jumping to the time of the next event (not continuous):
self .clock = current_event.time
This is discrete time advancement, not real-time. A simulation of 8 hours might complete in seconds.
System State
self .tellers: Dict[ str , Teller] = {}
self .waiting_queue: List[Customer] = []
State is modified only during event processing.
Initialization
The initialize() method prepares the simulation:
def initialize ( self ) -> None :
self .status = SimulationStatus. IDLE
self .clock = 0.0
self .waiting_queue.clear()
self .event_queue.clear()
# Create tellers
for i in range ( self .config.num_tellers):
t_id = f "T- { i + 1 } "
self .tellers[t_id] = Teller( id = t_id)
# Schedule first arrival
first_arrival_time = self .generator.get_next_arrival_interval()
if first_arrival_time <= self .config.max_simulation_time:
self .schedule_event(
SimulationEvent(first_arrival_time, EventType. ARRIVAL )
)
Key Steps :
Reset clock to 0
Clear all queues
Initialize tellers with IDLE status
Schedule the first customer arrival
Main Simulation Loop
The run() method is the heart of the engine:
def run ( self ) -> None :
self .status = SimulationStatus. RUNNING
while self .event_queue and self .status == SimulationStatus. RUNNING :
# Extract next event
current_event = heapq.heappop( self .event_queue)
# Check termination
if current_event.time > self .config.max_simulation_time:
break
# Advance clock
self .clock = current_event.time
# Process event
self .process_next_event(current_event)
self .status = SimulationStatus. FINISHED
Loop Characteristics :
Continues while events exist AND status is RUNNING
Terminates at max_simulation_time
Can be interrupted by setting status to PAUSED or STOPPED
Event Processing
Event dispatcher routes to appropriate handlers:
def process_next_event ( self , event : SimulationEvent) -> None :
if event.event_type == EventType. ARRIVAL :
self .handle_arrival()
elif event.event_type == EventType. SERVICE_START :
self .handle_service_start(event.teller_id, event.customer)
elif event.event_type == EventType. SERVICE_END :
self .handle_service_end(event.teller_id)
ARRIVAL Event
When a customer arrives:
def handle_arrival ( self ) -> None :
# 1. Generate customer attributes
priority, txn_type = self .generator.get_next_customer_attributes()
service_time = max ( 0.1 , self .generator.get_service_time())
# 2. Create customer entity
customer = Customer(
id = str (uuid.uuid4())[: 8 ],
arrival_time = self .clock,
service_time = service_time,
priority = priority,
transaction_type = txn_type
)
# 3. Add to queue
self .waiting_queue.append(customer)
self .waiting_queue.sort( key = lambda c : (c.priority, c.arrival_time))
# 4. Schedule NEXT arrival
next_interval = self .generator.get_next_arrival_interval()
next_time = self .clock + next_interval
if next_time <= self .config.max_simulation_time:
self .schedule_event(SimulationEvent(next_time, EventType. ARRIVAL ))
# 5. Try to assign to free teller
self ._assign_free_teller()
Key Points :
Generates customer stochastically
Maintains priority queue ordering
Chains arrivals (each arrival schedules the next)
Immediately attempts service if tellers available
SERVICE_START Event
When a customer begins service:
def handle_service_start ( self , teller_id : str , customer : Customer) -> None :
teller = self .tellers.get(teller_id)
if teller:
# Update teller state
teller.start_service(customer, self .clock)
# Schedule service end
end_time = self .clock + customer.service_time
self .schedule_event(
SimulationEvent(end_time, EventType. SERVICE_END , teller_id = teller_id)
)
Atomicity : Service start and end scheduling happen together.
SERVICE_END Event
When service completes:
def handle_service_end ( self , teller_id : str ) -> None :
teller = self .tellers.get(teller_id)
if teller:
# Complete service
served_customer = teller.end_service()
# Try to serve next customer
self ._assign_free_teller()
Cascading : Ending one service may immediately start another.
Resource Allocation
The _assign_free_teller() method implements the allocation policy:
def _assign_free_teller ( self ) -> None :
if not self .waiting_queue:
return
for t_id, teller in self .tellers.items():
if teller.status == TellerStatus. IDLE :
# Pop highest priority customer
next_customer = self .waiting_queue.pop( 0 )
# Schedule immediate service start
self .schedule_event(
SimulationEvent(
self .clock,
EventType. SERVICE_START ,
customer = next_customer,
teller_id = t_id
)
)
return # Assign one at a time
Policy :
Check if customers are waiting
Find first IDLE teller
Remove highest priority customer from queue
Schedule SERVICE_START at current time
Stop (only one assignment per call)
Calling _assign_free_teller() after arrivals and service completions ensures customers are served ASAP.
Event Timeline Example
Time Complexity
Operation Complexity Schedule event O(log E) where E = events in queue Extract next event O(log E) Arrival handling O(Q log Q) where Q = queue size (due to sort) Service start O(1) Service end O(1) Assign teller O(T) where T = number of tellers
The waiting_queue.sort() on every arrival is O(Q log Q). For large queues, consider using a heap-based priority queue.
Simulation Lifecycle
Control Methods
Pause
def pause ( self ) -> None :
if self .status == SimulationStatus. RUNNING :
self .status = SimulationStatus. PAUSED
Pausing stops the event loop but preserves state.
Resume
def resume ( self ) -> None :
if self .status == SimulationStatus. PAUSED :
self .run()
Stop
def stop ( self ) -> None :
self .status = SimulationStatus. FINISHED
self .event_queue.clear()
Stopping clears remaining events and finalizes the simulation.
Memory Usage
Event queue grows with arrival rate
Waiting queue bounded by max_queue_capacity
Total memory: O(E + Q + T)
Execution Speed
Simulating 8 hours typically takes <1 second
Dominated by event processing, not real-time delays
Scalable to thousands of customers
Optimization Opportunities
Replace list sort with heapq for queue :
heapq.heappush( self .waiting_queue, (customer.priority, customer.arrival_time, customer))
Pre-allocate teller array instead of dict for O(1) lookup
Batch metrics collection instead of per-customer
Next Steps
Customer Module How customers are generated and managed
Queue Module Priority queue implementation details
Teller Module Teller state machine and operations
Metrics Module How performance data is collected