Overview
The Database class provides an async context manager wrapping an aiosqlite connection. It manages persistent storage for all C2 server data including sessions, tasks, results, and nonce replay protection.
Source: server/storage.py
Database Schema
sessions Table
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY ,
hostname TEXT NOT NULL ,
username TEXT NOT NULL ,
os TEXT NOT NULL ,
agent_ver TEXT NOT NULL ,
first_seen REAL NOT NULL ,
last_seen REAL NOT NULL ,
jitter_pct INTEGER NOT NULL ,
active INTEGER NOT NULL DEFAULT 1
)
Stores agent session metadata.
tasks Table
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY ,
session_id TEXT NOT NULL ,
command TEXT NOT NULL ,
args TEXT NOT NULL ,
timeout_s INTEGER NOT NULL ,
queued_at REAL NOT NULL ,
status TEXT NOT NULL
)
Stores all tasks queued for agents. args stored as JSON string.
results Table
CREATE TABLE IF NOT EXISTS results (
result_id TEXT PRIMARY KEY ,
task_id TEXT NOT NULL ,
stdout TEXT NOT NULL ,
stderr TEXT NOT NULL ,
exit_code INTEGER NOT NULL ,
duration_ms INTEGER NOT NULL ,
received_at REAL NOT NULL
)
Stores task execution results.
nonces Table
CREATE TABLE IF NOT EXISTS nonces (
nonce TEXT PRIMARY KEY ,
received_at REAL NOT NULL
)
Stores nonces for replay attack detection. Entries older than 24 hours are pruned.
Database Class
Constructor
def __init__ ( self , db_path : str = DB_PATH )
Initializes database connection (connection established on __aenter__).
db_path
str
default: "logs/c2_server.db"
Path to SQLite database file. Use :memory: for in-memory testing.
Example:
from server.storage import Database
# Production database
db = Database() # Uses logs/c2_server.db
# Test database
test_db = Database( ':memory:' )
# Custom path
custom_db = Database( '/var/lib/c2/data.db' )
Context Manager
async def __aenter__ ( self )
async def __aexit__ ( self , exc_type , exc_val , exc_tb )
Async context manager for automatic connection management.
Example:
async with Database() as db:
# Connection is open
await db.insert_session( ... )
# Connection automatically closed on exit
Behavior:
__aenter__: Opens aiosqlite connection, sets row_factory, creates tables
__aexit__: Closes connection gracefully
Session Methods
insert_session()
async def insert_session ( self , session_id : str , hostname : str , username : str ,
os : str , agent_ver : str , jitter_pct : int ) -> None
Insert a new session row with active=1 and first_seen/last_seen set to now.
Current username on agent system
Beacon jitter percentage (0-100)
Example:
async with Database() as db:
await db.insert_session(
session_id = '550e8400-e29b-41d4-a716-446655440000' ,
hostname = 'VICTIM-PC' ,
username = 'jdoe' ,
os = 'Windows 10 22H2' ,
agent_ver = '1.0.0' ,
jitter_pct = 20
)
get_session()
async def get_session ( self , session_id : str ) -> aiosqlite.Row | None
Return the session row for session_id, or None if not found.
Row object with columns accessible by name, or None
Example:
row = await db.get_session(session_id)
if row:
print ( f "Hostname: { row[ 'hostname' ] } " )
print ( f "Active: { row[ 'active' ] } " )
print ( f "Last seen: { row[ 'last_seen' ] } " )
update_last_seen()
async def update_last_seen ( self , session_id : str ) -> None
Update last_seen timestamp for an active session.
Example:
# Called on every beacon/heartbeat
await db.update_last_seen(session_id)
deactivate_session()
async def deactivate_session ( self , session_id : str ) -> None
Mark a session as inactive (active = 0).
UUID of the session to deactivate
Example:
# Operator kills session
await db.deactivate_session(session_id)
# Session remains in DB but active=0
list_sessions()
async def list_sessions ( self ) -> list
Return all session rows ordered by last_seen descending.
List of session rows, most recent first
Example:
sessions = await db.list_sessions()
for row in sessions:
status = "Active" if row[ 'active' ] else "Inactive"
print ( f " { row[ 'hostname' ] } - { status } " )
Task Methods
insert_task()
async def insert_task ( self , task_id : str , session_id : str , command : str ,
args : str , timeout_s : int ) -> None
Insert a new task row with status PENDING.
UUID of the session this task belongs to
Command type: shell, download, upload, etc.
JSON string of command arguments (e.g., '["whoami"]')
Maximum execution time in seconds
Example:
import json
await db.insert_task(
task_id = 'abc-123' ,
session_id = 'def-456' ,
command = 'shell' ,
args = json.dumps([ 'whoami' ]),
timeout_s = 30
)
update_task_status()
async def update_task_status ( self , task_id : str , status : str ) -> None
Update the status field of a task row.
New status: PENDING, DISPATCHED, COMPLETE, ERROR
Example:
# Task sent to agent
await db.update_task_status(task_id, 'DISPATCHED' )
# Task completed
await db.update_task_status(task_id, 'COMPLETE' )
# Task failed
await db.update_task_status(task_id, 'ERROR' )
get_pending_task()
async def get_pending_task ( self , session_id : str ) -> aiosqlite.Row | None
Return the oldest PENDING task for a session, or None.
Oldest PENDING task row, or None if no pending tasks
Example:
task = await db.get_pending_task(session_id)
if task:
print ( f "Next task: { task[ 'command' ] } { task[ 'args' ] } " )
get_tasks_for_session()
async def get_tasks_for_session ( self , session_id : str ) -> list
Return all task rows for a session ordered by queued_at.
List of task rows sorted by queued_at ascending
Example:
tasks = await db.get_tasks_for_session(session_id)
for task in tasks:
print ( f " { task[ 'command' ] } - { task[ 'status' ] } " )
Result Methods
insert_result()
async def insert_result ( self , result_id : str , task_id : str , stdout : str ,
stderr : str , exit_code : int , duration_ms : int ) -> None
Insert a task result row.
UUID of the task this result belongs to
Execution duration in milliseconds
Example:
import uuid
await db.insert_result(
result_id = str (uuid.uuid4()),
task_id = 'abc-123' ,
stdout = 'VICTIM-PC \\ jdoe' ,
stderr = '' ,
exit_code = 0 ,
duration_ms = 42
)
get_results_for_session()
async def get_results_for_session ( self , session_id : str ) -> list
Return all results for tasks belonging to a session via JOIN.
List of result rows sorted by received_at ascending
Example:
results = await db.get_results_for_session(session_id)
for result in results:
print ( f "Task { result[ 'task_id' ] } :" )
print ( f " Exit code: { result[ 'exit_code' ] } " )
print ( f " Output: { result[ 'stdout' ] } " )
if result[ 'stderr' ]:
print ( f " Errors: { result[ 'stderr' ] } " )
Nonce Methods
check_and_store_nonce()
async def check_and_store_nonce ( self , nonce : str ) -> bool
Return True and store nonce if unseen in last 24h, False if replay detected.
Nonce string from beacon message
True if nonce is new (beacon accepted), False if replay detected (reject)
Example:
if await db.check_and_store_nonce(nonce):
# Process beacon
pass
else :
# Replay attack detected
return JSONResponse( status_code = 409 , content = { 'error' : 'replay detected' })
Behavior:
Queries for nonce received within last 24 hours
If found, logs warning and returns False (replay)
If not found, inserts nonce with current timestamp
Calls prune_old_nonces() to keep table lean
Returns True (accept beacon)
prune_old_nonces()
async def prune_old_nonces ( self ) -> None
Delete nonce rows older than 24 hours.
Example:
# Called automatically by check_and_store_nonce()
await db.prune_old_nonces()
Behavior:
Deletes rows where received_at < time.time() - 86400
Keeps nonce table size bounded
Prevents unbounded growth over long deployments
Constants
DB_PATH
str
default: "logs/c2_server.db"
Default database file path relative to project root
Nonce retention period (24 hours)
Usage Patterns
Complete Session Workflow
async with Database() as db:
# Agent checks in
await db.insert_session(
session_id = 'abc-123' ,
hostname = 'VICTIM-PC' ,
username = 'jdoe' ,
os = 'Windows 10' ,
agent_ver = '1.0.0' ,
jitter_pct = 20
)
# Operator queues task
await db.insert_task(
task_id = 'def-456' ,
session_id = 'abc-123' ,
command = 'shell' ,
args = '["whoami"]' ,
timeout_s = 30
)
# Agent beacons
await db.update_last_seen( 'abc-123' )
# Server dispatches task
task = await db.get_pending_task( 'abc-123' )
await db.update_task_status( 'def-456' , 'DISPATCHED' )
# Agent submits result
await db.update_task_status( 'def-456' , 'COMPLETE' )
await db.insert_result(
result_id = str (uuid.uuid4()),
task_id = 'def-456' ,
stdout = 'VICTIM-PC \\ jdoe' ,
stderr = '' ,
exit_code = 0 ,
duration_ms = 42
)
# Operator views results
results = await db.get_results_for_session( 'abc-123' )
print (results[ 0 ][ 'stdout' ])
Replay Protection
async with Database() as db:
# First beacon
nonce1 = 'abc123def456'
assert await db.check_and_store_nonce(nonce1) is True
# Replay attempt
assert await db.check_and_store_nonce(nonce1) is False
# New beacon
nonce2 = 'xyz789uvw012'
assert await db.check_and_store_nonce(nonce2) is True
Testing
Self-Test Suite
Run built-in tests:
Test Coverage:
insert_session / get_session
update_last_seen
list_sessions
insert_task / get_pending_task
update_task_status
insert_result / get_results_for_session
check_and_store_nonce replay detection
prune_old_nonces
deactivate_session
Output:
Running storage self-test...
[OK] insert_session / get_session
[OK] update_last_seen
[OK] list_sessions
[OK] insert_task / get_pending_task
[OK] update_task_status
[OK] insert_result / get_results_for_session
[OK] check_and_store_nonce — replay correctly rejected
[OK] prune_old_nonces
[OK] deactivate_session
All storage self-tests passed.
In-Memory Testing
Use :memory: for fast unit tests:
async def test_example ():
async with Database( ':memory:' ) as db:
# Database exists only in RAM
# No file I/O, very fast
# Automatically destroyed on exit
pass
aiosqlite.Row enables column access by name (e.g., row['hostname']) with minimal overhead.
Every write operation calls await self._conn.commit() for immediate durability. For bulk operations, consider batching commits.
No indexes defined. Consider adding indexes on session_id and status for large deployments.
Called on every beacon to prevent table bloat. For high traffic, consider background pruning task.
Logging
Structured logging with contextual fields:
logger.info( 'session inserted' , extra = {
'session_id' : session_id,
'hostname' : hostname
})
Events:
Session inserted
Session deactivated
Task queued
Result stored
Nonce replay detected
Server Main Main server integration
SessionManager In-memory session management
CommandQueue Task queue management
Configuration Database path configuration