Skip to main content
The feature_extractor module computes 20 statistical features from network flows for machine learning classification and anomaly detection. Features include timing metrics, flow statistics, payload size distributions, and entropy measures.

Functions

extract_features()

Extract all 20 features from a single FlowRecord.
def extract_features(flow: FlowRecord) -> dict
Parameters:
  • flow (FlowRecord): Flow record to analyze
Returns:
  • dict: Dictionary with 20 feature values plus 5-tuple identifiers
Features extracted:

Timing Features (6)

  1. mean_iat (float): Mean inter-arrival time between packets in seconds
  2. std_iat (float): Standard deviation of inter-arrival times
  3. min_iat (float): Minimum inter-arrival time
  4. max_iat (float): Maximum inter-arrival time
  5. burstiness (float): Coefficient of variation (std/mean) of IAT; high = bursty, low = regular
  6. iat_autocorr (float): Lag-1 autocorrelation of IAT series; detects periodic patterns

Flow Features (5)

  1. flow_duration_s (float): Total flow duration in seconds
  2. total_bytes (int): Total bytes transferred
  3. total_packets (int): Total packet count
  4. bytes_per_second (float): Throughput in bytes/second
  5. packets_per_second (float): Packet rate in packets/second

Size Features (5)

  1. payload_len_mean (float): Mean payload size per packet
  2. payload_len_std (float): Standard deviation of payload sizes
  3. payload_len_min (float): Minimum payload size
  4. payload_len_max (float): Maximum payload size
  5. shannon_entropy (float): Shannon entropy of payload size distribution (0-8 bits)

Network Identifiers (5)

  1. src_ip (str): Source IP address
  2. dst_ip (str): Destination IP address
  3. src_port (int): Source port
  4. dst_port (int): Destination port
  5. protocol (str): Protocol name
Example:
from telemetry.flow_parser import parse_pcap
from telemetry.feature_extractor import extract_features

flows = parse_pcap('capture.pcap')
for flow in flows:
    features = extract_features(flow)
    print(f"Flow: {features['src_ip']}:{features['src_port']} -> {features['dst_ip']}:{features['dst_port']}")
    print(f"  Mean IAT: {features['mean_iat']:.6f}s")
    print(f"  Burstiness: {features['burstiness']:.4f}")
    print(f"  Throughput: {features['bytes_per_second']:.2f} B/s")
    print(f"  Entropy: {features['shannon_entropy']:.4f} bits")

extract_all()

Load a .flows file and extract features for every flow.
def extract_all(flows_file: str) -> list[dict]
Parameters:
  • flows_file (str): Path to JSON lines .flows file from flow_parser
Returns:
  • list[dict]: List of feature dictionaries, one per flow
Raises:
  • FileNotFoundError: If flows file does not exist
Behavior:
  • Reads flows line-by-line (JSON Lines format)
  • Deserializes each line to FlowRecord
  • Calls extract_features() for each flow
  • Logs extraction statistics
  • Returns empty list with warning if no features extracted
Example:
from telemetry.feature_extractor import extract_all, save_features

# Extract features from parsed flows
features = extract_all('capture.flows')
print(f"Extracted {len(features)} feature vectors")

# Save to CSV and JSON
save_features(features, 'capture.features.csv')

save_features()

Write feature vectors to both CSV and JSON Lines formats.
def save_features(features: list[dict], output_file: str) -> None
Parameters:
  • features (list[dict]): Feature dictionaries to save
  • output_file (str): Output path (typically .csv extension)
Returns:
  • None
Behavior:
  • Creates parent directories if needed
  • Writes CSV file with header row (uses first feature dict for column names)
  • Writes JSON Lines file (replaces .csv with .json in filename)
  • Logs save statistics with both output paths
Output files:
  • {output_file} (CSV format with header)
  • {output_file_without_.csv}.json (JSON Lines format)
Example:
from telemetry.feature_extractor import extract_all, save_features

features = extract_all('capture.flows')
save_features(features, 'features/capture.features.csv')
# Creates:
#   features/capture.features.csv
#   features/capture.features.json

shannon_entropy()

Compute Shannon entropy of a byte sequence.
def shannon_entropy(data: bytes) -> float
Parameters:
  • data (bytes): Byte sequence to analyze
Returns:
  • float: Entropy in bits (0.0 to 8.0 for byte data)
Formula:
H(X) = -Σ p(x) * log₂(p(x))
Behavior:
  • Counts byte value frequencies (0-255)
  • Computes probability distribution
  • Calculates entropy using Shannon formula
  • Returns 0.0 for empty input
Interpretation:
  • 0.0: All bytes identical (no randomness)
  • 8.0: Uniform distribution (maximum randomness)
  • TLS/encrypted traffic typically 7.5-8.0 bits
  • Plaintext HTTP typically 4.5-6.0 bits
Example:
from telemetry.feature_extractor import shannon_entropy

# High entropy (encrypted)
encrypted = bytes([i % 256 for i in range(256)])  # Uniform distribution
print(shannon_entropy(encrypted))  # ~8.0 bits

# Low entropy (plaintext)
plaintext = b'GET / HTTP/1.1\r\n' * 10
print(shannon_entropy(plaintext))  # ~4-5 bits

# Zero entropy
zeros = b'\x00' * 1000
print(shannon_entropy(zeros))  # 0.0 bits

Feature Computation Details

Burstiness

Coefficient of variation of inter-arrival times:
burstiness = std_iat / mean_iat if mean_iat > 0 else 0.0
Interpretation:
  • < 0.5: Regular, periodic traffic (e.g., beacons)
  • 0.5-1.0: Moderate variability (e.g., interactive sessions)
  • 1.0: Bursty traffic (e.g., file transfers)

IAT Autocorrelation

Lag-1 autocorrelation detects periodicity in packet timing:
iat_autocorr = Σ[(IAT[i] - mean) * (IAT[i-1] - mean)] / [(n-1) * std²]
Interpretation:
  • Close to 1.0: Strong positive correlation (periodic beaconing)
  • Close to 0.0: No correlation (random timing)
  • Close to -1.0: Alternating pattern

Payload Entropy Approximation

Since FlowRecord stores payload sizes (not raw payloads), entropy is computed from the size distribution:
size_bytes = bytes([size % 256 for size in flow.payload_sizes])
entropy = shannon_entropy(size_bytes)
Note: This underestimates true payload entropy but is sufficient for comparing TLS-encrypted flows where all have uniformly high entropy.

Zero Division Handling

The module uses _SAFE_DIVISOR = 1e-9 to prevent division by zero:
safe_duration = duration if duration > 0 else _SAFE_DIVISOR
bytes_per_second = byte_count / safe_duration

Helper Functions

_mean()

Compute arithmetic mean of a list.
def _mean(values: list[float]) -> float
Returns: Mean value or 0.0 if list is empty

_std()

Compute population standard deviation.
def _std(values: list[float], mean: float) -> float
Returns: Standard deviation or 0.0 if fewer than 2 values

_iat_autocorr()

Compute lag-1 autocorrelation of inter-arrival times.
def _iat_autocorr(iats: list[float], mean: float, std: float) -> float
Returns: Autocorrelation coefficient or 0.0 if undefined

Command-Line Usage

# Extract features and save to CSV/JSON
python -m telemetry.feature_extractor --input capture.flows --output capture.features.csv
Arguments:
  • --input: Input .flows file from flow_parser (required)
  • --output: Output .csv file path (required); JSON also written alongside

Machine Learning Integration

The extracted features are designed for classification tasks: Example workflow:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from telemetry.feature_extractor import extract_all

# Extract features
features = extract_all('training_data.flows')
df = pd.DataFrame(features)

# Select feature columns (exclude 5-tuple identifiers)
feature_cols = [
    'mean_iat', 'std_iat', 'min_iat', 'max_iat', 'burstiness', 'iat_autocorr',
    'flow_duration_s', 'total_bytes', 'total_packets', 'bytes_per_second', 'packets_per_second',
    'payload_len_mean', 'payload_len_std', 'payload_len_min', 'payload_len_max', 'shannon_entropy'
]

X = df[feature_cols]
y = df['label']  # Add labels separately

# Train classifier
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X, y)

# Feature importance
importance = pd.DataFrame({
    'feature': feature_cols,
    'importance': clf.feature_importances_
}).sort_values('importance', ascending=False)
print(importance)

Feature Importance for C2 Detection

Typical feature rankings for detecting C2 beaconing:
  1. mean_iat: Most discriminative; beacons have consistent intervals
  2. std_iat: Low variance indicates regular callbacks
  3. burstiness: Beacons have low burstiness (less than 0.5)
  4. iat_autocorr: High positive correlation for periodic beacons
  5. payload_len_mean: Beacons often have small, consistent payloads
  6. bytes_per_second: Low throughput distinguishes C2 from data exfiltration

Constants

_SAFE_DIVISOR

Small value substituted for zero denominators:
_SAFE_DIVISOR = 1e-9

_PROTO_MAP

IP protocol number to name mapping (inherited from flow_parser):
_PROTO_MAP = {1: 'ICMP', 6: 'TCP', 17: 'UDP'}

Requirements

  • Depends on: common.logger, telemetry.flow_parser

Performance Notes

  • Feature extraction is O(n) where n = packet_count per flow
  • Shannon entropy computation is O(m) where m = len(payload_sizes)
  • All features computed in a single pass over flow data
  • No external ML libraries required for extraction

Notes

  • Empty flows (zero packets) result in zero/default feature values
  • Entropy approximation is specific to encrypted traffic analysis
  • CSV and JSON outputs contain identical data in different formats
  • Feature vectors are independent and can be processed in parallel

Build docs developers (and LLMs) love