Skip to main content

Overview

The message format module handles serialization, encryption, and framing of C2 protocol messages. It implements a secure binary wire format with a 7-byte header followed by encrypted payload. File: common/message_format.py

Wire Format

[ Header: 7 bytes | Nonce: 12 bytes | Ciphertext + Tag: variable ]

Header structure (big-endian):
  - magic: 2 bytes (0xC2C2)
  - version: 1 byte (0x01)
  - body_length: 4 bytes (uint32)

Constants

MAGIC
int
default:"0xC2C2"
Protocol magic bytes for frame identification
PROTOCOL_VERSION
int
default:"0x01"
Current protocol version
HEADER_FORMAT
str
default:"'!HBI'"
Struct format for header: big-endian uint16 + uint8 + uint32
HEADER_SIZE
int
default:"7"
Total header size in bytes

Message Types

MSG_CHECKIN
str
default:"'CHECKIN'"
Agent initial check-in message
MSG_TASK_PULL
str
default:"'TASK_PULL'"
Agent polling for new tasks
MSG_TASK_RESULT
str
default:"'TASK_RESULT'"
Agent task execution results
MSG_TASK_DISPATCH
str
default:"'TASK_DISPATCH'"
Server task dispatch to agent
MSG_HEARTBEAT
str
default:"'HEARTBEAT'"
Agent heartbeat message
MSG_TERMINATE
str
default:"'TERMINATE'"
Terminate agent session

Core Functions

pack()

Serialize, encrypt, and frame a payload dictionary into a C2 envelope.
pack(payload_dict: dict, key: bytes) -> bytes
payload_dict
dict
required
Message payload to pack (must be JSON-serializable)
key
bytes
required
32-byte encryption key
return
bytes
Complete binary envelope ready for transmission
Process:
  1. JSON-serialize the payload dict
  2. Apply padding using active traffic profile
  3. Encrypt with AES-GCM
  4. Prepend nonce to ciphertext
  5. Build 7-byte header with magic, version, and body length
  6. Concatenate header + body
Raises:
  • ProtocolError - If payload is not a dict, empty, or not JSON-serializable
  • CryptoError - If encryption fails
Example:
from common.message_format import pack, build_checkin
from common.crypto import get_session_key

key = get_session_key()
msg = build_checkin('VICTIM-PC', 'jdoe', 'Windows 10', '1.0.0', 20)
envelope = pack(msg, key)

unpack()

Validate, decrypt, and deserialize a raw C2 envelope into a dictionary.
unpack(raw: bytes, key: bytes) -> dict
raw
bytes
required
Raw binary envelope received from network
key
bytes
required
32-byte decryption key (must match the key used for packing)
return
dict
Deserialized payload dictionary
Process:
  1. Validate frame has minimum length (7 bytes header + 12 nonce + 16 tag)
  2. Unpack and verify header (magic, version, body_length)
  3. Extract nonce and ciphertext from body
  4. Decrypt and verify authentication tag
  5. Strip padding
  6. Parse JSON and validate it’s a dict
Raises:
  • ProtocolError - If frame is truncated, has bad magic, unsupported version, or invalid JSON
  • CryptoError - If authentication tag verification fails (tampered data or wrong key)
Example:
from common.message_format import unpack
from common.crypto import get_session_key

key = get_session_key()
payload = unpack(envelope_bytes, key)
print(payload['msg_type'])

Message Builders

All message builders return a dictionary with mandatory fields:
  • msg_type: Message type identifier
  • session_id: Session identifier (if applicable)
  • timestamp: Unix timestamp (int)
  • nonce: UUID hex for replay protection
  • payload: Type-specific payload data

build_checkin()

Build an agent check-in message.
build_checkin(hostname: str, username: str, os_info: str, 
              agent_ver: str, jitter_pct: int) -> dict
hostname
str
required
Agent hostname
username
str
required
Current username
os_info
str
required
Operating system information
agent_ver
str
required
Agent version string
jitter_pct
int
required
Beacon jitter percentage (0-100)
return
dict
Message dict with msg_type='CHECKIN' and agent info in payload
Example:
msg = build_checkin(
    hostname='VICTIM-PC',
    username='jdoe',
    os_info='Windows 10 Pro',
    agent_ver='1.2.3',
    jitter_pct=20
)

build_task_pull()

Build a task polling message.
build_task_pull(session_id: str) -> dict
session_id
str
required
Agent session identifier
return
dict
Message dict with msg_type='TASK_PULL'
Example:
msg = build_task_pull(session_id='abc123')

build_task_result()

Build a task execution result message.
build_task_result(session_id: str, task_id: str, stdout: str,
                  stderr: str, exit_code: int, duration_ms: int) -> dict
session_id
str
required
Agent session identifier
task_id
str
required
Task identifier
stdout
str
required
Standard output from command execution
stderr
str
required
Standard error from command execution
exit_code
int
required
Process exit code
duration_ms
int
required
Task execution time in milliseconds
return
dict
Message dict with msg_type='TASK_RESULT' and execution details
Example:
msg = build_task_result(
    session_id='abc123',
    task_id='task-456',
    stdout='command output',
    stderr='',
    exit_code=0,
    duration_ms=142
)

Internal Functions

_base_payload()

Internal function - not part of public API
Generate base message structure with mandatory fields.
_base_payload(msg_type: str, session_id: str = None) -> dict
Returns dict with:
  • msg_type: Message type (validated against VALID_MSG_TYPES)
  • session_id: Optional session identifier
  • timestamp: Current Unix timestamp
  • nonce: UUID for replay protection
  • payload: Empty dict (populated by specific builders)

Security Features

Authenticated Encryption

All messages use AES-256-GCM for confidentiality and authenticity

Replay Protection

Each message includes a unique UUID nonce stored server-side

Length Obfuscation

Padding applied before encryption to hide plaintext size

Version Control

Protocol version in header enables future evolution

Error Handling

Raised for:
  • Invalid payload structure
  • Frame too short for header
  • Bad magic bytes
  • Unsupported protocol version
  • Truncated frames
  • Invalid JSON after decryption
Raised for:
  • Authentication tag verification failure
  • Tampered ciphertext
  • Wrong decryption key

Dependencies

import json
import struct
import time
import uuid

from common.crypto import decrypt, encrypt
from common.utils import CryptoError, ProtocolError
from evasion.padding_strat import pad, strip_padding
from transport.traffic_profile import load_active_profile

See Also

Build docs developers (and LLMs) love