Skip to main content

Overview

The flow parser analyzes PCAP files and extracts per-flow statistics organized by 5-tuple (src_ip, dst_ip, src_port, dst_port, protocol). It computes inter-arrival times, payload sizes, and beacon intervals across multiple TCP connections. Source: telemetry/flow_parser.py

FlowRecord Structure

Each network flow is represented as a FlowRecord dataclass:
@dataclass
class FlowRecord:
    src_ip:              str          # Source IP address
    dst_ip:              str          # Destination IP address
    src_port:            int          # Source port
    dst_port:            int          # Destination port
    protocol:            str          # 'TCP', 'UDP', 'ICMP', or number
    start_time:          float        # Unix timestamp of first packet
    end_time:            float        # Unix timestamp of last packet
    duration_s:          float        # Flow duration in seconds
    packet_count:        int          # Total packets in flow
    byte_count:          int          # Total bytes transferred
    inter_arrival_times: list[float]  # IATs between consecutive packets
    payload_sizes:       list[int]    # Per-packet payload sizes
    beacon_iats:         list[float]  # Inter-flow gaps to same destination

Key Concepts

Inter-Arrival Times (IAT)

Time deltas between consecutive packets within a single flow:
# For packets arriving at times [1.0, 1.2, 1.5, 2.0]
inter_arrival_times = [0.2, 0.3, 0.5]  # len = packet_count - 1

Beacon Inter-Arrival Times (Beacon IATs)

Time gaps between consecutive TCP connection starts to the same destination:
# Three connections to 192.168.56.101:443 starting at:
# Connection 1: start_time = 10.0
# Connection 2: start_time = 20.5
# Connection 3: start_time = 31.2

# Beacon IATs:
# Flow 1: beacon_iats = [10.5]  # gap to next connection
# Flow 2: beacon_iats = [10.7]  # gap to next connection
# Flow 3: beacon_iats = []      # last connection (no following gap)
Detection Logic (telemetry/flow_parser.py:34-52):
  1. Group flows by (dst_ip, dst_port)
  2. Filter client-initiated flows: src_port > 1024 and dst_port <= 1024
  3. Sort by start_time
  4. Compute gaps between consecutive connection starts
  5. Assign gap to the earlier flow
This captures true beacon interval timing — critical for detecting C2 beaconing patterns.

Core Functions

parse_pcap

Parse a PCAP file into FlowRecords:
from telemetry.flow_parser import parse_pcap

flows = parse_pcap('pcaps/capture.pcap')

for flow in flows:
    print(f'{flow.src_ip}:{flow.src_port} -> {flow.dst_ip}:{flow.dst_port}')
    print(f'  Duration: {flow.duration_s:.2f}s')
    print(f'  Packets: {flow.packet_count}')
    print(f'  Beacon IATs: {flow.beacon_iats}')
Parameters:
  • pcap_file (str): Path to PCAP file
Returns: list[FlowRecord] - One record per unique 5-tuple Implementation Details:
  • Uses Scapy’s PcapReader for memory-efficient streaming
  • Supports TCP, UDP, ICMP, and raw IP packets
  • Maps protocol numbers to names: {1: 'ICMP', 6: 'TCP', 17: 'UDP'}
  • Non-TCP/UDP flows use ports (0, 0)
  • Automatically calls compute_beacon_iats() to populate beacon intervals

save_flows

Write FlowRecords to a JSON Lines file:
from telemetry.flow_parser import save_flows

save_flows(flows, 'pcaps/capture.flows')
Format: One JSON object per line (JSONL):
{"src_ip":"192.168.56.102","dst_ip":"192.168.56.101","src_port":54321,"dst_port":443,"protocol":"TCP","start_time":1234567890.123,"end_time":1234567892.456,"duration_s":2.333,"packet_count":12,"byte_count":5432,"inter_arrival_times":[0.1,0.2,0.15],"payload_sizes":[1400,1400,632],"beacon_iats":[10.5]}
Parameters:
  • flows (list[FlowRecord]): Flow records to save
  • output_file (str): Output path (parent directories created automatically)

compute_beacon_iats

Compute inter-flow beacon intervals (called automatically by parse_pcap):
from telemetry.flow_parser import compute_beacon_iats

compute_beacon_iats(flows)
# Modifies flows in-place, populating beacon_iats field
Algorithm:
  1. Group flows by (dst_ip, dst_port)
  2. Filter client-initiated: src_port > 1024 and dst_port <= 1024
  3. Sort each group by start_time
  4. For each flow i, compute beacon_iats[i] = [start_time[i+1] - start_time[i]]

Command-Line Usage

Run as a standalone module:
# Basic parsing
python -m telemetry.flow_parser \
  --input pcaps/capture.pcap \
  --output pcaps/capture.flows

# Output
Parsed 42 flows pcaps/capture.flows
Arguments:
  • --input (required): Input PCAP file path
  • --output (required): Output .flows file path

Integration Example

Typical usage in analysis pipelines:
import os
from telemetry import flow_parser, feature_extractor

# Parse PCAP
pcap_path = 'pcaps/experiment.pcap'
flows = flow_parser.parse_pcap(pcap_path)

if not flows:
    print('No flows detected')
    exit(1)

# Save flows for later analysis
flows_path = pcap_path.replace('.pcap', '.flows')
flow_parser.save_flows(flows, flows_path)

# Extract features from flows
features = feature_extractor.extract_all(flows_path)

File Format Examples

Input: PCAP File

Standard tcpdump/Wireshark format (.pcap or .pcapng)

Output: JSON Lines (.flows)

{"src_ip":"192.168.56.102","dst_ip":"192.168.56.101","src_port":54320,"dst_port":443,"protocol":"TCP","start_time":1710163852.123456,"end_time":1710163854.456789,"duration_s":2.333333,"packet_count":15,"byte_count":8192,"inter_arrival_times":[0.1,0.2,0.15,0.3,0.05,0.12,0.18,0.25,0.08,0.14,0.22,0.11,0.19,0.13],"payload_sizes":[1400,1400,1400,1400,1400,832,0,0,0,0,0,0,0,0,0],"beacon_iats":[10.5]}
{"src_ip":"192.168.56.102","dst_ip":"192.168.56.101","src_port":54321,"dst_port":443,"protocol":"TCP","start_time":1710163862.623456,"end_time":1710163864.789012,"duration_s":2.165556,"packet_count":14,"byte_count":7845,"inter_arrival_times":[0.12,0.18,0.14,0.21,0.09,0.16,0.23,0.11,0.19,0.15,0.2,0.13,0.17],"payload_sizes":[1400,1400,1400,1400,1400,845,0,0,0,0,0,0,0,0],"beacon_iats":[11.2]}

Flow Statistics

Access flow metrics programmatically:
flows = parse_pcap('pcaps/capture.pcap')

# Filter HTTPS flows
https_flows = [f for f in flows if f.dst_port == 443 and f.protocol == 'TCP']

# Compute aggregate statistics
total_bytes = sum(f.byte_count for f in https_flows)
avg_duration = sum(f.duration_s for f in https_flows) / len(https_flows)
max_packets = max(f.packet_count for f in https_flows)

print(f'HTTPS flows: {len(https_flows)}')
print(f'Total bytes: {total_bytes}')
print(f'Avg duration: {avg_duration:.2f}s')
print(f'Max packets: {max_packets}')

# Analyze beacon intervals
beacon_intervals = [iat for f in https_flows for iat in f.beacon_iats]
if beacon_intervals:
    avg_beacon = sum(beacon_intervals) / len(beacon_intervals)
    print(f'Mean beacon interval: {avg_beacon:.2f}s')

Protocol Mapping

Protocol numbers are mapped to names (telemetry/flow_parser.py:14):
_PROTO_MAP = {1: 'ICMP', 6: 'TCP', 17: 'UDP'}
Unknown protocols use their numeric value as a string (e.g., "50" for ESP).

Performance Considerations

Memory Efficiency:
  • Uses Scapy’s PcapReader to stream packets (doesn’t load entire PCAP into RAM)
  • Suitable for analyzing multi-GB capture files
Processing Speed:
  • ~10,000 packets/second on typical hardware
  • 1 GB PCAP with 1M packets ≈ 100 seconds
Limitations:
  • Stores all flows in memory (one FlowRecord per 5-tuple)
  • Very large captures with millions of unique flows may require streaming processing

Logging

Flow parsing operations are logged:
from common.logger import get_logger
logger = get_logger('flow_parser')
Log Events:
  • parsing pcap: Logged at start (includes filename)
  • parse complete: Logged on success (includes flow count and packet count)
  • beacon iats computed: Logged after beacon IAT calculation (includes group count)
  • no flows detected: Warning if PCAP contains no IP traffic
  • flows saved: Logged after writing .flows file (includes path and count)

Troubleshooting

No flows detected:
  • Ensure PCAP contains IP packets (not just Layer 2 frames)
  • Check BPF filter used during capture
  • Verify PCAP is not corrupted: tcpdump -r capture.pcap -c 10
Empty beacon_iats:
  • Beacon IATs only computed for client-initiated flows (src_port > 1024, dst_port <= 1024)
  • Single-connection captures won’t have beacon intervals
  • Check that multiple connections to same destination were captured
FileNotFoundError:
  • Ensure PCAP path is correct
  • Use absolute paths or run from project root

Next Steps

After parsing flows:
  1. Extract features: Use Feature Extractor to compute ML features
  2. Visualize flows: Load .flows files into Jupyter notebooks for plotting
  3. Run experiments: See Experiments for automated analysis

See Also

Build docs developers (and LLMs) love