Overview
The C2 Framework is an experimental command and control system designed for academic research and cybersecurity analysis only . Built with Python, it demonstrates modern C2 architecture patterns while implementing multiple evasion techniques for traffic analysis research.
This framework is designed exclusively for isolated lab environments. All deployment must comply with the research disclaimer and usage restrictions.
Key Features
The framework provides a complete C2 infrastructure with focus on security research:
Secure Communication AES-256-GCM encryption with HKDF key derivation and replay protection via nonce validation
Traffic Evasion Configurable jitter strategies, padding techniques, and HTTP header randomization
Session Management Persistent sessions with SQLite storage, heartbeat monitoring, and graceful reconnection
Task Execution Safe command execution with timeout controls, output capture, and blocklist enforcement
Architecture Components
The framework consists of four primary components:
Server Component
FastAPI-based HTTP server handling beacon endpoints
Session manager tracking active agent connections
Command queue managing task dispatch and results
SQLite database for persistent storage and replay protection
# Server beacon endpoint from server/server_main.py:48-124
@app.post ( '/beacon' )
async def beacon ( request : Request) -> Response:
# Handle all inbound agent messages: CHECKIN, TASK_PULL, TASK_RESULT
source_ip = request.client.host if request.client else 'unknown'
raw_body = await request.body()
# Unpack and decrypt
session_key = get_session_key()
payload = mf.unpack(raw_body, session_key)
# Nonce replay check
if not await db.check_and_store_nonce(nonce):
return JSONResponse( status_code = 409 , content = { 'error' : 'replay detected' })
Agent Component
Beacon loop with exponential backoff retry logic
Environment checks enforcing lab-only operation
Command executor with safety controls and blocklists
Jitter strategies (uniform and gaussian) for timing variation
# Agent beacon loop from agent/beacon.py:143-229
class BeaconLoop :
def run ( self ) -> None :
# Initial checkin with back-off retry
self ._checkin()
# Main beacon loop
while True :
# Compute jittered sleep interval
sleep_s = self ._sleep_fn(
config. BEACON_INTERVAL_S ,
self ._profile.jitter_pct,
)
time.sleep(sleep_s)
# Send TASK_PULL and handle response
response = _send(pull_payload, self ._key)
if msg_type == mf. MSG_TASK_DISPATCH :
self ._handle_task_dispatch(response)
Communication Protocol
The framework implements a custom binary protocol with authenticated encryption:
Frame Structure
7-byte header containing magic bytes (0xC2C2), protocol version, and body length
Encryption
AES-256-GCM authenticated encryption with 12-byte random nonces
Padding
Configurable random padding (0-256 bytes) to obscure payload sizes
Serialization
JSON payloads with message types: CHECKIN, TASK_PULL, TASK_DISPATCH, TASK_RESULT
# Protocol wire format from common/message_format.py:1-24
"""
Wire format (7-byte header + body):
[ magic: 2B | version: 1B | length: 4B | nonce: 12B | ciphertext+tag ]
Struct format '!HBI': big-endian, uint16 + uint8 + uint32.
"""
MAGIC = 0x C2C2
PROTOCOL_VERSION = 0x 01
HEADER_FORMAT = '!HBI'
HEADER_SIZE = struct.calcsize( HEADER_FORMAT ) # 7 bytes
Evasion Techniques
Multiple traffic evasion strategies support research into detection methods:
Jitter Strategies
Padding
Header Randomization
Uniform Jitter : Random intervals within ±N% of base beacon intervalGaussian Jitter : Normally distributed intervals with configurable standard deviation# From evasion/sleep_strat.py:8-21
def uniform_sleep ( base_s : float , jitter_pct : int ) -> float :
delta = base_s * (jitter_pct / 100.0 )
return max ( MIN_SLEEP_S , random.uniform(base_s - delta, base_s + delta))
def gaussian_sleep ( base_s : float , jitter_pct : int ) -> float :
sigma = base_s * (jitter_pct / 100.0 )
return max ( MIN_SLEEP_S , random.gauss(base_s, sigma))
Random padding obscures message lengths in encrypted traffic:
2-byte length prefix
Random padding bytes (0-256 configurable)
Applied before encryption
# From evasion/padding_strat.py:8-19
def pad ( plaintext : bytes , min_bytes : int , max_bytes : int ) -> bytes :
pad_len = random.randint(min_bytes, max_bytes)
pad_bytes = os.urandom(pad_len)
return struct.pack( '>H' , pad_len) + pad_bytes + plaintext
Four levels of HTTP header randomization:
Level 0 : Fixed Chrome User-Agent
Level 1 : Randomized User-Agent
Level 2 : Randomized UA and Accept-Language
Level 3 : Full randomization including header order
# From evasion/header_randomizer.py:32-76
def get_headers ( level : int ) -> dict :
if level == 3 :
ua = random.choice( USER_AGENTS )
language = random.choice( ACCEPT_LANGUAGES )
encoding = random.choice( ACCEPT_ENCODINGS )
optional = _build_optional(ua, language, encoding)
random.shuffle(optional) # Randomize header order
Security Features
The framework implements multiple security controls for safe research operation:
Environment Enforcement
LAB_MODE environment variable required
Whitelisted server hosts only
VM detection logging
Debugger detection warnings
Command Blocklist Blocks privileged operations:
Registry modifications (reg)
Task scheduling (schtasks, at)
Service control (sc)
Network mapping (nmap, arp)
Privilege enumeration (whoami /priv)
Replay Protection
UUID-based nonces in every message
Server-side nonce validation and storage
409 Conflict on replay attempts
Safe Execution
subprocess.run with shell=False
Configurable timeout enforcement
Output size limits (64KB)
Exit code capture
# Command blocklist from common/config_example.py:38-48
BLOCKED_COMMANDS = [
'reg' ,
'schtasks' ,
'at' ,
'sc' ,
'net use' ,
'arp' ,
'nmap' ,
'whoami /priv' ,
'net localgroup' ,
]
Message Flow
The typical communication flow between agent and server:
Agent Checkin
Agent sends CHECKIN with system information (hostname, username, OS, agent version)
Session Creation
Server creates session, assigns UUID, returns session_id in response
Beacon Loop
Agent periodically sends TASK_PULL requests with jittered intervals
Task Dispatch
Server responds with TASK_DISPATCH when commands are queued, or empty response
Task Execution
Agent executes command, captures stdout/stderr/exit_code, measures duration
Result Submission
Agent sends TASK_RESULT payload back to server
All messages are encrypted, authenticated, include replay protection nonces, and may be padded for traffic analysis resistance.
Technology Stack
Python 3.10+ - Core implementation language
FastAPI - Async HTTP server framework
SQLite - Persistent session and task storage
Cryptography - AES-GCM encryption, HKDF key derivation
Uvicorn - ASGI server with TLS support
Requests - HTTP client with custom TLS adapter
PyYAML - Traffic profile configuration
Getting Started
To begin working with the C2 Framework:
Architecture Understand the system design and component interaction
Research Disclaimer Review usage restrictions and legal requirements
Setup Guide Configure the lab environment and deploy components
Research Applications
This framework supports research in:
Traffic Analysis : Study encrypted C2 detection methods
Behavioral Analysis : Examine jitter and timing patterns
Protocol Design : Test authenticated encryption schemes
Evasion Techniques : Evaluate header randomization effectiveness
Telemetry Collection : Capture and analyze beacon traffic
All research must be conducted in isolated lab environments. Deployment on production networks or systems without authorization is strictly prohibited.