Skip to main content

Overview

The executor.py module provides safe command execution with blocklist filtering, timeout enforcement, and structured result capture. All commands run with shell=False to prevent injection attacks.

Module Location

agent/executor.py

Classes

TaskResult

Dataclass representing the result of a command execution.

Attributes

@dataclass
class TaskResult:
    task_id:     str  # Unique task identifier
    stdout:      str  # Standard output (capped at 64KB)
    stderr:      str  # Standard error (capped at 64KB)
    exit_code:   int  # Process exit code
    duration_ms: int  # Execution duration in milliseconds

Example

result = TaskResult(
    task_id     = 'task-123',
    stdout      = 'hello world\n',
    stderr      = '',
    exit_code   = 0,
    duration_ms = 42,
)

Functions

execute(task_id: str, command: str, args: list, timeout_s: int) -> TaskResult

Executes a command safely and returns a TaskResult. Never uses shell=True.

Parameters

  • task_id (str): Unique identifier for this task
  • command (str): Command to execute (executable name only)
  • args (list): Command arguments as a list
  • timeout_s (int): Maximum execution time in seconds

Returns

  • TaskResult: Structured result containing stdout, stderr, exit code, and duration

Exit Codes

CodeMeaning
0Success
1General executor error
124Timeout exceeded
126Blocked command
127Command not found

Execution Flow

Step 1: Input Normalization
args    = args or []
command = (command or '').strip()

if not command:
    return TaskResult(
        task_id     = task_id,
        stdout      = '',
        stderr      = 'BLOCKED: empty command',
        exit_code   = 126,
        duration_ms = _elapsed(),
    )
Step 2: Blocklist Check
if _is_blocked(command):
    return TaskResult(
        task_id     = task_id,
        stdout      = '',
        stderr      = 'BLOCKED: prohibited command',
        exit_code   = 126,
        duration_ms = _elapsed(),
    )
Step 3: Build Command List
cmd_list = [command] + [str(a) for a in args]
Step 4: Execute with subprocess
result = subprocess.run(
    cmd_list,
    capture_output = True,
    text           = True,
    timeout        = timeout_s,
    shell          = False,  # prevents injection
)
Step 5: Return Result
return TaskResult(
    task_id     = task_id,
    stdout      = (result.stdout or '')[:MAX_OUTPUT],
    stderr      = (result.stderr or '')[:MAX_OUTPUT],
    exit_code   = result.returncode,
    duration_ms = _elapsed(),
)

Usage Examples

Successful Command:
result = execute('task-1', 'echo', ['hello', 'world'], 30)

assert result.exit_code == 0
assert 'hello world' in result.stdout
assert result.duration_ms > 0
Blocked Command:
result = execute('task-2', 'nmap', ['-sV', '10.0.0.1'], 30)

assert result.exit_code == 126
assert result.stderr == 'BLOCKED: prohibited command'
assert result.stdout == ''
Command Not Found:
result = execute('task-3', 'nonexistent_command', [], 30)

assert result.exit_code == 127
assert result.stderr == 'COMMAND NOT FOUND'
Timeout:
result = execute('task-4', 'sleep', ['100'], 2)

assert result.exit_code == 124
assert result.stderr == 'TIMEOUT'
With Arguments:
result = execute('task-5', 'ls', ['-la', '/tmp'], 10)

assert result.exit_code == 0
assert len(result.stdout) > 0

Helper Functions

_is_blocked(command: str) -> bool

Internal function - checks if command matches any entry in BLOCKED_COMMANDS.
Parameters:
  • command (str): Command to check
Returns:
  • bool: True if command is blocked
Logic:
command_lower = command.lower().strip()
return any(
    command_lower == blocked.lower() or
    command_lower.startswith(blocked.lower() + " ")
    for blocked in config.BLOCKED_COMMANDS
)
Examples:
assert _is_blocked('nmap')           # True
assert _is_blocked('NMAP')           # True (case-insensitive)
assert _is_blocked('nmap -sV')       # True (with args)
assert _is_blocked('ls')             # False (not in blocklist)

Constants

MAX_OUTPUT = 65536  # 64 KB cap to prevent oversized responses
Output capping prevents memory exhaustion from commands with excessive output:
stdout = (result.stdout or '')[:MAX_OUTPUT]
stderr = (result.stderr or '')[:MAX_OUTPUT]

Exception Handling

TimeoutExpired

Command exceeded timeout_s:
except subprocess.TimeoutExpired as e:
    logger.warning('command timed out', extra={
        'task_id':   task_id,
        'timeout_s': timeout_s,
    })
    return TaskResult(
        task_id     = task_id,
        stdout      = e.stdout or '',
        stderr      = e.stderr or 'TIMEOUT',
        exit_code   = 124,
        duration_ms = _elapsed(),
    )

FileNotFoundError

Command executable not found in PATH:
except FileNotFoundError:
    logger.warning('command not found', extra={
        'task_id': task_id,
        'command': command,
    })
    return TaskResult(
        task_id     = task_id,
        stdout      = '',
        stderr      = 'COMMAND NOT FOUND',
        exit_code   = 127,
        duration_ms = _elapsed(),
    )

General Exceptions

Unexpected errors during execution:
except Exception as e:
    logger.error('unexpected executor error', extra={
        'task_id': task_id,
        'reason':  str(e),
    })
    return TaskResult(
        task_id     = task_id,
        stdout      = '',
        stderr      = f'EXECUTOR ERROR: {e}',
        exit_code   = 1,
        duration_ms = _elapsed(),
    )

Logging

The executor emits structured logs for all operations: Execution start:
logger.info('executing command', extra={
    'task_id': task_id,
    'command': command,
    'cmd_args':    args,
})
Execution complete:
logger.info('command complete', extra={
    'task_id':    task_id,
    'exit_code':  result.returncode,
    'duration_ms': task_result.duration_ms,
})
Blocked command:
logger.warning('blocked command rejected', extra={
    'task_id': task_id,
    'command': command,
})
Timeout:
logger.warning('command timed out', extra={
    'task_id':   task_id,
    'timeout_s': timeout_s,
})
Not found:
logger.warning('command not found', extra={
    'task_id': task_id,
    'command': command,
})

Security Features

The executor enforces strict security controls to prevent code injection and abuse.

1. Shell Injection Prevention

Always uses shell=False:
result = subprocess.run(
    cmd_list,
    shell = False,  # prevents injection
    ...
)
Safe:
execute('t1', 'echo', ['arg1; rm -rf /'], 10)
# Executes: ['echo', 'arg1; rm -rf /']
# Output: 'arg1; rm -rf /' (string literal, not executed)
Unsafe (NOT used):
# This would be dangerous:
subprocess.run('echo arg1; rm -rf /', shell=True)  # DON'T DO THIS

2. Command Blocklist

Blocked commands return exit code 126:
config.BLOCKED_COMMANDS = [
    'nmap', 'masscan', 'netcat', 'nc',
    'wireshark', 'tcpdump', 'aircrack-ng',
    # ... more dangerous tools
]

3. Timeout Enforcement

Prevents runaway processes:
result = subprocess.run(
    cmd_list,
    timeout = timeout_s,  # hard timeout
    ...
)

4. Output Capping

Prevents memory exhaustion:
stdout = (result.stdout or '')[:MAX_OUTPUT]  # 64 KB max
stderr = (result.stderr or '')[:MAX_OUTPUT]

5. Input Validation

Rejects empty commands:
if not command:
    return TaskResult(..., stderr='BLOCKED: empty command', exit_code=126)

Self-Test Suite

The module includes comprehensive self-tests:
python agent/executor.py
Test Coverage:
  1. Blocked command returns exit code 126
  2. All BLOCKED_COMMANDS entries are rejected
  3. Successful command returns exit code 0
  4. Command not found returns exit code 127
  5. Timeout returns exit code 124
  6. Duration measurement is positive
  7. shell=False prevents injection
  8. Arguments passed correctly
Expected Output:
Running executor self-test...
  [OK] blocked command returns exit_code 126
  [OK] all BLOCKED_COMMANDS entries are rejected
  [OK] successful command returns exit_code 0 with correct stdout
  [OK] missing command returns exit_code 127
  [OK] timed-out command returns exit_code 124
  [OK] duration_ms is positive
  [OK] shell=False prevents command injection
  [OK] args passed correctly to command

All executor self-tests passed.

Best Practices

1. Always Specify Timeout

# Good
result = execute('t1', 'ls', ['-la'], timeout_s=30)

# Bad - missing timeout could hang indefinitely
result = execute('t1', 'ls', ['-la'], timeout_s=None)  # Don't do this

2. Check Exit Codes

result = execute('t1', 'whoami', [], 10)

if result.exit_code == 0:
    print(f"User: {result.stdout.strip()}")
elif result.exit_code == 126:
    print("Command blocked by security policy")
elif result.exit_code == 127:
    print("Command not found")

3. Handle Capped Output

result = execute('t1', 'cat', ['/huge/file.txt'], 30)

if len(result.stdout) >= MAX_OUTPUT:
    print("Warning: output was truncated to 64 KB")

4. Use Separate Command and Args

# Good - command and args separate
execute('t1', 'ls', ['-la', '/tmp'], 10)

# Bad - embedding args in command string
execute('t1', 'ls -la /tmp', [], 10)  # Won't work as expected
  • beacon - Task dispatch and result reporting
  • config - BLOCKED_COMMANDS configuration
  • logger - Logging utilities

Build docs developers (and LLMs) love