Skip to main content

Overview

The CAN reverse engineering pipeline implements a three-phase architecture that progressively analyzes raw CAN data to extract and cluster proprietary signals. The design is inspired by compiler theory, using preprocessing, lexical analysis, and semantic analysis phases.
# From Main.py - Pipeline phases
pre_processor = PreProcessor(can_data_filename, pickle_arb_id_filename, pickle_j1979_filename)
id_dictionary, j1979_dictionary = pre_processor.generate_arb_id_dictionary(...)

# Lexical Analysis
tokenize_dictionary(a_timer, id_dictionary, ...)
signal_dictionary = generate_signals(a_timer, id_dictionary, ...)

# Semantic Analysis
subset_df = subset_selection(a_timer, signal_dictionary, ...)
corr_matrix_subset = subset_correlation(subset_df, ...)
cluster_dict = greedy_signal_clustering(corr_matrix_subset, ...)

Phase 1: Pre-Processing

Purpose

Convert raw CAN logs into structured data and prepare for analysis.

Key Operations

1

Data Import

Read CAN log files and convert to Pandas DataFrame format.
# From PreProcessor.py
def import_csv(self, a_timer: PipelineTimer, filename):
    convert_dict = {'time': fix_time, 'id': hex2int, 'dlc': hex2int, 
                    'b0': hex2int, 'b1': hex2int, ...}
    self.data = read_csv(filename,
                         names=['time', 'id', 'dlc', 'b0', 'b1', 'b2', ...],
                         converters=convert_dict,
                         index_col=0)
2

Arb ID Dictionary Creation

Group messages by Arbitration ID and create ArbID runtime objects.
for arb_id in Series.unique(self.data['id']):
    this_id = ArbID(arb_id)
    this_id.original_data = self.data.loc[self.data['id'] == arb_id].copy()
    this_id.dlc = this_id.original_data['dlc'].iloc[0]
    id_dictionary[arb_id] = this_id
3

J1979 Detection

Identify and extract standardized diagnostic messages (Arb IDs 0x7DF and 0x7E8).
J1979 signals serve as ground truth for validating the pipeline’s correlation-based clustering in later phases.
4

Binary Matrix Generation

Convert hexadecimal payload bytes to binary bit matrix.
# From ArbID.py
self.boolean_matrix = zeros((self.original_data.__len__(), self.dlc * 8), dtype=uint8)
for i, row in enumerate(self.original_data.itertuples()):
    for j, cell in enumerate(row[1:]):
        if cell > 0:
            bin_string = format(cell, '08b')
            self.boolean_matrix[i, j * 8:j * 8 + 8] = [x == '1' for x in bin_string]
5

TANG Calculation

Compute Transition Analysis Numerical Gradient for each bit position.
# From ArbID.py
transition_matrix = logical_xor(self.boolean_matrix[:-1, ], self.boolean_matrix[1:, ])
self.tang = sum(transition_matrix, axis=0, dtype=float64)
if max(self.tang) > 0:
    normalize_strategy(self.tang, axis=0, copy=False)
    self.static = False
TANG measures how frequently each bit position changes over time. High TANG values indicate active signal data, while zero values indicate padding or static bits.
6

Frequency Analysis

Analyze transmission timing to classify Arb IDs as synchronous or asynchronous.
# From ArbID.py
freq_intervals = self.original_data.index[1:] - self.original_data.index[:-1]
self.freq_mean = mean(freq_intervals) * time_convert
self.freq_std = std(freq_intervals, ddof=1) * time_convert
mean_offset = ci_accuracy * self.freq_std / sqrt(len(freq_intervals))
self.freq_ci = (self.freq_mean - mean_offset, self.freq_mean + mean_offset)
self.mean_to_ci_ratio = 2*mean_offset/self.freq_mean
if self.mean_to_ci_ratio <= synchronous_threshold:
    self.synchronous = True

Output

  • id_dictionary: Dictionary of ArbID objects with binary matrices and TANG values
  • j1979_dictionary: Dictionary of known diagnostic signals

Phase 2: Lexical Analysis

Purpose

Identify continuous signals within CAN payloads by analyzing bit-level transition patterns.

Key Operations

1

Tokenization

Group adjacent bit positions into tokens (potential signals) based on TANG patterns.
# From LexicalAnalysis.py
def get_composition(arb_id: ArbID, include_padding=False, max_inversion_distance: float = 0.0):
    tokens = []
    start_index = 0
    currently_clustering = False
    big_endian = True
    
    for i, bit_position in enumerate(nditer(arb_id.tang)):
        # Greedy clustering logic to identify contiguous signals
        if bit_position <= 0.000001:
            arb_id.padding.append(i)  # Mark as padding
        # ... clustering logic ...
    
    arb_id.tokenization = tokens
The algorithm detects both big-endian and little-endian signals by analyzing whether TANG values increase or decrease across adjacent bit positions.
2

Token Merging

Merge adjacent tokens with similar transition frequencies.
# From LexicalAnalysis.py
def merge_tokens(arb_id: ArbID, max_distance):
    for i, token in enumerate(arb_id.tokenization):
        if last and last[1] + 1 == token[0]:  # Adjacent tokens?
            if abs(arb_id.tang[last[1]] - arb_id.tang[token[0]]) <= max_distance:
                # Merge into single token
                token = (last[0], token[1])
                arb_id.tokenization[i] = token
3

Signal Generation

Convert tokens into Signal objects with time series data.
# From LexicalAnalysis.py
def generate_signals(a_timer, arb_id_dict, signal_pickle_filename, normalize_strategy, force=False):
    signal_dict = {}
    for k, arb_id in arb_id_dict.items():
        if not arb_id.static:
            for token in arb_id.tokenization:
                signal = Signal(k, token[0], token[1])
                # Convert binary matrix to integer time series
                temp1 = [''.join(str(x) for x in row) 
                         for row in arb_id.boolean_matrix[:, token[0]:token[1] + 1]]
                temp2 = [int(row, 2) for row in temp1]
                signal.time_series = Series(temp2, index=arb_id.original_data.index)
                signal.normalize_and_set_metadata(normalize_strategy)
                signal_dict[k][(arb_id.id, signal.start_index, signal.stop_index)] = signal
    return signal_dict
4

Shannon Index Calculation

Measure signal entropy to identify dynamic signals.
# From Signal.py
def set_shannon_index(self):
    si: float = 0.0
    n: int = self.time_series.__len__()
    for count in self.time_series.value_counts():
        p_i = count / n
        si += p_i * log10(p_i)
    si *= -1
    self.shannon_index = si
The Shannon Index quantifies signal variability. Higher values indicate more dynamic signals with diverse values, while lower values suggest static or slowly-changing signals.

Output

  • signal_dictionary: Dictionary of Signal objects with normalized time series and Shannon Index values

Phase 3: Semantic Analysis

Purpose

Group related signals using correlation-based clustering to identify signals that represent the same physical phenomena.

Key Operations

1

Subset Selection

Select top 25% of signals with highest Shannon Index for initial clustering.
# From SemanticAnalysis.py
def subset_selection(a_timer, signal_dict, subset_pickle, force, subset_size=0.25):
    df.sort_values(by="Shannon_Index", inplace=True, ascending=False)
    df = df.head(int(round(df.__len__() * subset_size, 0)))
    # Create DataFrame with aligned time indices
    for signal in subset:
        subset_df[signal_id] = signal.time_series.reindex(index=largest_index, 
                                                           method='nearest')
    return subset_df
Subset selection reduces computational complexity while focusing on the most informative signals.
2

Correlation Matrix

Calculate Pearson correlation coefficient between all signal pairs.
corr_matrix_subset = subset.corr()
3

Greedy Clustering

Group signals with correlation above threshold (default: 0.85).
# From SemanticAnalysis.py
def greedy_signal_clustering(correlation_matrix, correlation_threshold=0.8, fuzzy_labeling=True):
    cluster_dict = {}
    previously_clustered_signals = {}
    
    for n, row in enumerate(correlation_keys):
        for m, col in enumerate(correlation_keys):
            result = round(correlation_matrix.iloc[n, m], 2)
            if result >= correlation_threshold:
                # Clustering logic: create new clusters or add to existing ones
                # Supports fuzzy labeling (signals in multiple clusters)
                ...
    
    return cluster_dict
Fuzzy labeling allows signals to belong to multiple clusters, capturing cases where a signal correlates with multiple physical phenomena.
4

Label Propagation

Extend cluster labels from subset to all signals.
# From SemanticAnalysis.py
def label_propagation(a_timer, signal_dict, cluster_dict, correlation_threshold, force):
    # Calculate correlation matrix for ALL non-static signals
    correlation_matrix = df.corr()
    
    # Propagate labels without creating new clusters
    for n, row in enumerate(correlation_keys):
        for m, col in enumerate(correlation_keys):
            if result >= correlation_threshold:
                if row in previously_clustered_signals.keys():
                    cluster_dict[previously_clustered_signals[row]].append(col)
                    previously_clustered_signals[col] = previously_clustered_signals[row]
    
    return df, correlation_matrix, cluster_dict
5

J1979 Labeling

Correlate proprietary signals with known J1979 diagnostic signals.
# From SemanticAnalysis.py
def j1979_signal_labeling(a_timer, j1979_corr_filename, df_signals, 
                          j1979_dict, signal_dict, correlation_threshold, force):
    # Combine proprietary signals with J1979 signals
    df_combined = concat([df_signals, df_j1979], axis=1)
    correlation_matrix = df_combined.corr()
    
    # Label proprietary signals with correlated J1979 PIDs
    for index, row in correlation_matrix[df_columns][:-len(df_columns)].iterrows():
        max_index = row.idxmax(axis=1, skipna=True)
        if row[max_index] >= correlation_threshold:
            signal.j1979_title = max_index
            signal.j1979_pcc = row[max_index]
Signals that correlate highly with J1979 data (e.g., r > 0.85) can be automatically labeled with human-readable names like “Engine RPM” or “Vehicle Speed”.

Output

  • cluster_dict: Dictionary mapping cluster IDs to lists of signal IDs
  • corr_matrix_full: Complete correlation matrix for all signals
  • Labeled signals: Signals with J1979 correlations identified

Data Flow Diagram

Raw CAN Log (.log file)
        |
        v
[Pre-Processing]
   - CSV Import
   - Arb ID Grouping
   - J1979 Detection
   - Binary Matrix
   - TANG Calculation
   - Frequency Analysis
        |
        v
   ArbID Dictionary
        |
        v
[Lexical Analysis]
   - Tokenization
   - Token Merging
   - Signal Generation
   - Shannon Index
        |
        v
   Signal Dictionary
        |
        v
[Semantic Analysis]
   - Subset Selection
   - Correlation Matrix
   - Greedy Clustering
   - Label Propagation
   - J1979 Labeling
        |
        v
Clustered Signals + Labels

Runtime Considerations

Caching Strategy

The pipeline uses pickle files to cache intermediate results:
# From Main.py
pickle_arb_id_filename = 'pickleArbIDs.p'
pickle_signal_filename = 'pickleSignals.p'
pickle_clusters_filename = 'pickleClusters.p'

# Skip expensive recomputation if cached
if path.isfile(pickle_arb_id_filename) and not force:
    return load(open(pickle_arb_id_filename, "rb"))
Set force=True flags to recompute specific phases when testing parameter changes.

Performance Tracking

The PipelineTimer class tracks execution time for each phase:
a_timer = PipelineTimer(verbose=True)
a_timer.start_function_time()
# ... processing ...
a_timer.set_arb_id_creation()

Next Steps

Build docs developers (and LLMs) love