Skip to main content

Overview

The feature extractor computes statistical and entropy features from FlowRecords for machine learning analysis. It extracts timing features (IAT statistics, burstiness), flow features (throughput), size features (payload distributions), and entropy measures. Source: telemetry/feature_extractor.py

Feature Categories

Extracted features are organized into four categories:

Timing Features

FeatureDescriptionUse Case
mean_iatMean inter-arrival time (seconds)Detect regular beaconing
std_iatStandard deviation of IATMeasure timing jitter
min_iatMinimum IATDetect burst patterns
max_iatMaximum IATIdentify gaps
burstinessCoefficient of variation (std/mean)Distinguish bursty vs regular traffic
iat_autocorrLag-1 autocorrelation of IAT seriesDetect periodic patterns

Flow Features

FeatureDescriptionUse Case
flow_duration_sTotal flow duration (seconds)Session length analysis
total_bytesTotal bytes transferredVolume analysis
total_packetsTotal packet countActivity level
bytes_per_secondThroughput (bytes/sec)Bandwidth usage
packets_per_secondPacket rate (packets/sec)Activity intensity

Size Features

FeatureDescriptionUse Case
payload_len_meanMean payload size (bytes)Detect padding
payload_len_stdStd dev of payload sizeSize variance
payload_len_minMinimum payload sizeDetect empty packets
payload_len_maxMaximum payload sizeMTU analysis

Entropy Features

FeatureDescriptionUse Case
shannon_entropyShannon entropy of payload sizesDetect encryption/randomization

Core Functions

extract_features

Extract all features from a single FlowRecord:
from telemetry.feature_extractor import extract_features
from telemetry.flow_parser import FlowRecord

flow = FlowRecord(
    src_ip='192.168.56.102',
    dst_ip='192.168.56.101',
    src_port=54321,
    dst_port=443,
    protocol='TCP',
    start_time=1710163852.0,
    end_time=1710163854.5,
    duration_s=2.5,
    packet_count=10,
    byte_count=8000,
    inter_arrival_times=[0.2, 0.3, 0.25, 0.28, 0.22, 0.31, 0.27, 0.24, 0.29],
    payload_sizes=[1400, 1400, 1400, 1400, 1400, 600, 0, 0, 0, 0],
    beacon_iats=[10.5]
)

features = extract_features(flow)

print(f"Mean IAT: {features['mean_iat']:.4f}s")
print(f"Burstiness: {features['burstiness']:.4f}")
print(f"Throughput: {features['bytes_per_second']:.1f} bytes/s")
print(f"Entropy: {features['shannon_entropy']:.4f}")
Returns: Dictionary containing all 20 features plus metadata fields:
{
    # Metadata
    'src_ip': '192.168.56.102',
    'dst_ip': '192.168.56.101',
    'src_port': 54321,
    'dst_port': 443,
    'protocol': 'TCP',
    
    # Timing features
    'mean_iat': 0.2611,
    'std_iat': 0.0359,
    'min_iat': 0.2,
    'max_iat': 0.31,
    'burstiness': 0.1375,
    'iat_autocorr': -0.1234,
    
    # Flow features
    'flow_duration_s': 2.5,
    'total_bytes': 8000,
    'total_packets': 10,
    'bytes_per_second': 3200.0,
    'packets_per_second': 4.0,
    
    # Size features
    'payload_len_mean': 800.0,
    'payload_len_std': 620.48,
    'payload_len_min': 0.0,
    'payload_len_max': 1400.0,
    
    # Entropy features
    'shannon_entropy': 1.5710
}

extract_all

Load a .flows file and extract features for all flows:
from telemetry.feature_extractor import extract_all

features = extract_all('pcaps/capture.flows')

print(f'Extracted {len(features)} feature vectors')

# Access individual features
for feat in features:
    if feat['dst_port'] == 443:
        print(f"{feat['src_ip']} -> {feat['dst_ip']}")
        print(f"  Mean IAT: {feat['mean_iat']:.4f}s")
        print(f"  Entropy: {feat['shannon_entropy']:.4f}")
Parameters:
  • flows_file (str): Path to .flows JSON Lines file
Returns: list[dict] - One feature dictionary per flow

save_features

Write features to both CSV and JSON formats:
from telemetry.feature_extractor import save_features

save_features(features, 'pcaps/capture.features.csv')

# Creates two files:
#   pcaps/capture.features.csv
#   pcaps/capture.features.json
Parameters:
  • features (list[dict]): Feature dictionaries from extract_all()
  • output_file (str): Base output path (.csv suffix optional)
Output Formats: CSV - Header row with all feature columns:
src_ip,dst_ip,src_port,dst_port,protocol,mean_iat,std_iat,min_iat,max_iat,burstiness,iat_autocorr,flow_duration_s,total_bytes,total_packets,bytes_per_second,packets_per_second,payload_len_mean,payload_len_std,payload_len_min,payload_len_max,shannon_entropy
192.168.56.102,192.168.56.101,54321,443,TCP,0.2611,0.0359,0.2,0.31,0.1375,-0.1234,2.5,8000,10,3200.0,4.0,800.0,620.48,0.0,1400.0,1.5710
JSON Lines - One object per line:
{"src_ip":"192.168.56.102","dst_ip":"192.168.56.101","src_port":54321,"dst_port":443,"protocol":"TCP","mean_iat":0.2611,"std_iat":0.0359,"min_iat":0.2,"max_iat":0.31,"burstiness":0.1375,"iat_autocorr":-0.1234,"flow_duration_s":2.5,"total_bytes":8000,"total_packets":10,"bytes_per_second":3200.0,"packets_per_second":4.0,"payload_len_mean":800.0,"payload_len_std":620.48,"payload_len_min":0.0,"payload_len_max":1400.0,"shannon_entropy":1.571}

Command-Line Usage

Run as a standalone module:
# Basic feature extraction
python -m telemetry.feature_extractor \
  --input pcaps/capture.flows \
  --output pcaps/capture.features.csv

# Output
Extracted 42 feature vectors pcaps/capture.features.csv
Arguments:
  • --input (required): Input .flows file from flow_parser
  • --output (required): Output CSV file (JSON also written automatically)

Feature Computation Details

Burstiness

Coefficient of variation of inter-arrival times:
burstiness = std_iat / mean_iat  # if mean_iat > 0, else 0.0
Interpretation:
  • Low values (< 0.5): Regular, periodic traffic (e.g., unmodified beacons)
  • High values (> 1.0): Bursty, irregular traffic (e.g., human browsing)

IAT Autocorrelation

Lag-1 autocorrelation measures correlation between consecutive IATs:
iat_autocorr = sum(
    (iats[i] - mean) * (iats[i-1] - mean)
    for i in range(1, n)
) / ((n - 1) * std ** 2)
Interpretation (telemetry/feature_extractor.py:42-51):
  • Positive values: Consecutive IATs are similar (periodic patterns)
  • Near zero: IATs are independent (random)
  • Negative values: Alternating fast/slow patterns

Shannon Entropy

Measures randomness of payload size distribution:
shannon_entropy = -sum(
    (count / total) * log2(count / total)
    for count in byte_counts if count > 0
)
Note: Due to lack of raw payload data in FlowRecords, entropy is computed over per-packet sizes modulo 256. This underestimates true entropy but provides consistent relative measurements across profiles. Interpretation:
  • Low entropy (< 2.0): Uniform sizes (e.g., fixed-size packets)
  • High entropy (> 5.0): Varied sizes (e.g., random padding)

Integration Example

Complete pipeline from PCAP to features:
import os
from telemetry import flow_parser, feature_extractor

# Parse PCAP → flows
pcap_path = 'pcaps/experiment.pcap'
flows = flow_parser.parse_pcap(pcap_path)

# Save flows
flows_path = pcap_path.replace('.pcap', '.flows')
flow_parser.save_flows(flows, flows_path)

# Extract features
features = feature_extractor.extract_all(flows_path)

# Save features to CSV and JSON
features_path = pcap_path.replace('.pcap', '.features.csv')
feature_extractor.save_features(features, features_path)

print(f'Pipeline complete: {len(features)} feature vectors')
print(f'CSV: {features_path}')
print(f'JSON: {features_path.replace(".csv", ".json")}')

Analysis Examples

Compare Baseline vs Evasion Profiles

import csv
import statistics

def load_csv_features(filepath):
    with open(filepath, 'r') as f:
        reader = csv.DictReader(f)
        return [{k: float(v) if k not in ['src_ip', 'dst_ip', 'protocol'] 
                 else v for k, v in row.items()} for row in reader]

baseline = load_csv_features('pcaps/baseline.features.csv')
evasion = load_csv_features('pcaps/high.features.csv')

# Compare burstiness
baseline_burst = statistics.mean(f['burstiness'] for f in baseline)
evasion_burst = statistics.mean(f['burstiness'] for f in evasion)

print(f'Baseline burstiness: {baseline_burst:.4f}')
print(f'Evasion burstiness:  {evasion_burst:.4f}')
print(f'Increase: {(evasion_burst / baseline_burst - 1) * 100:.1f}%')

# Compare entropy
baseline_entropy = statistics.mean(f['shannon_entropy'] for f in baseline)
evasion_entropy = statistics.mean(f['shannon_entropy'] for f in evasion)

print(f'\nBaseline entropy: {baseline_entropy:.4f}')
print(f'Evasion entropy:  {evasion_entropy:.4f}')

Filter by Flow Characteristics

features = extract_all('pcaps/capture.flows')

# Find long-duration flows
long_flows = [f for f in features if f['flow_duration_s'] > 5.0]
print(f'Long flows (>5s): {len(long_flows)}')

# Find high-throughput flows
fast_flows = [f for f in features if f['bytes_per_second'] > 10000]
print(f'High throughput (>10KB/s): {len(fast_flows)}')

# Find regular beacon-like flows
regular_flows = [f for f in features if f['burstiness'] < 0.3]
print(f'Regular flows (burstiness < 0.3): {len(regular_flows)}')

Zero-Division Handling

Safe divisors prevent division by zero (telemetry/feature_extractor.py:12):
_SAFE_DIVISOR = 1e-9

# Used when denominators may be zero
safe_duration = duration if duration > 0 else _SAFE_DIVISOR
bytes_per_second = byte_count / safe_duration

Performance

Processing Speed:
  • ~50,000 flows/second on typical hardware
  • Feature extraction from 10K flow file ≈ 0.2 seconds
Memory Usage:
  • Loads entire .flows file into memory
  • Typical flow: ~200 bytes in memory
  • 100K flows ≈ 20 MB RAM

Output File Organization

project_root/
├── pcaps/
│   ├── baseline.pcap              # Raw capture
│   ├── baseline.flows             # Parsed flows (JSON Lines)
│   ├── baseline.features.csv      # Features (CSV)
│   ├── baseline.features.json     # Features (JSON Lines)
│   ├── high.pcap
│   ├── high.flows
│   ├── high.features.csv
│   └── high.features.json
└── telemetry/
    └── feature_extractor.py

Logging

Feature extraction is logged:
from common.logger import get_logger
logger = get_logger('feature_extractor')
Log Events:
  • features extracted: Logged after processing (includes count)
  • no features extracted: Warning if flows file is empty
  • features saved: Logged after writing CSV/JSON (includes paths)

Troubleshooting

FileNotFoundError:
  • Ensure .flows file exists (run flow_parser first)
  • Use absolute paths or run from project root
Empty feature list:
  • Check if .flows file contains valid JSON lines
  • Verify flows were successfully parsed from PCAP
NaN or inf values:
  • Should not occur due to safe divisors
  • Report as bug if encountered

ML Integration

Features are ready for scikit-learn, TensorFlow, or PyTorch:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier

# Load features as DataFrame
df = pd.read_csv('pcaps/baseline.features.csv')

# Select numeric features only
feature_cols = [
    'mean_iat', 'std_iat', 'burstiness', 'iat_autocorr',
    'bytes_per_second', 'packets_per_second',
    'payload_len_mean', 'payload_len_std', 'shannon_entropy'
]

X = df[feature_cols].values
y = df['label'].values  # if you have ground-truth labels

# Train classifier
clf = RandomForestClassifier(n_estimators=100)
clf.fit(X, y)

Next Steps

  1. Run experiments: See Experiments for automated pipelines
  2. Visualize features: Use Jupyter notebooks to plot distributions
  3. Train models: Feed features into ML classifiers for C2 detection

See Also

Build docs developers (and LLMs) love