Overview
The CAN reverse engineering pipeline implements a three-phase architecture that progressively analyzes raw CAN data to extract and cluster proprietary signals. The design is inspired by compiler theory, using preprocessing, lexical analysis, and semantic analysis phases.
# From Main.py - Pipeline phases
pre_processor = PreProcessor(can_data_filename, pickle_arb_id_filename, pickle_j1979_filename)
id_dictionary, j1979_dictionary = pre_processor.generate_arb_id_dictionary(...)
# Lexical Analysis
tokenize_dictionary(a_timer, id_dictionary, ...)
signal_dictionary = generate_signals(a_timer, id_dictionary, ...)
# Semantic Analysis
subset_df = subset_selection(a_timer, signal_dictionary, ...)
corr_matrix_subset = subset_correlation(subset_df, ...)
cluster_dict = greedy_signal_clustering(corr_matrix_subset, ...)
Phase 1: Pre-Processing
Purpose
Convert raw CAN logs into structured data and prepare for analysis.
Key Operations
Data Import
Read CAN log files and convert to Pandas DataFrame format.# From PreProcessor.py
def import_csv(self, a_timer: PipelineTimer, filename):
convert_dict = {'time': fix_time, 'id': hex2int, 'dlc': hex2int,
'b0': hex2int, 'b1': hex2int, ...}
self.data = read_csv(filename,
names=['time', 'id', 'dlc', 'b0', 'b1', 'b2', ...],
converters=convert_dict,
index_col=0)
Arb ID Dictionary Creation
Group messages by Arbitration ID and create ArbID runtime objects.for arb_id in Series.unique(self.data['id']):
this_id = ArbID(arb_id)
this_id.original_data = self.data.loc[self.data['id'] == arb_id].copy()
this_id.dlc = this_id.original_data['dlc'].iloc[0]
id_dictionary[arb_id] = this_id
J1979 Detection
Identify and extract standardized diagnostic messages (Arb IDs 0x7DF and 0x7E8).J1979 signals serve as ground truth for validating the pipeline’s correlation-based clustering in later phases.
Binary Matrix Generation
Convert hexadecimal payload bytes to binary bit matrix.# From ArbID.py
self.boolean_matrix = zeros((self.original_data.__len__(), self.dlc * 8), dtype=uint8)
for i, row in enumerate(self.original_data.itertuples()):
for j, cell in enumerate(row[1:]):
if cell > 0:
bin_string = format(cell, '08b')
self.boolean_matrix[i, j * 8:j * 8 + 8] = [x == '1' for x in bin_string]
TANG Calculation
Compute Transition Analysis Numerical Gradient for each bit position.# From ArbID.py
transition_matrix = logical_xor(self.boolean_matrix[:-1, ], self.boolean_matrix[1:, ])
self.tang = sum(transition_matrix, axis=0, dtype=float64)
if max(self.tang) > 0:
normalize_strategy(self.tang, axis=0, copy=False)
self.static = False
TANG measures how frequently each bit position changes over time. High TANG values indicate active signal data, while zero values indicate padding or static bits.
Frequency Analysis
Analyze transmission timing to classify Arb IDs as synchronous or asynchronous.# From ArbID.py
freq_intervals = self.original_data.index[1:] - self.original_data.index[:-1]
self.freq_mean = mean(freq_intervals) * time_convert
self.freq_std = std(freq_intervals, ddof=1) * time_convert
mean_offset = ci_accuracy * self.freq_std / sqrt(len(freq_intervals))
self.freq_ci = (self.freq_mean - mean_offset, self.freq_mean + mean_offset)
self.mean_to_ci_ratio = 2*mean_offset/self.freq_mean
if self.mean_to_ci_ratio <= synchronous_threshold:
self.synchronous = True
Output
id_dictionary: Dictionary of ArbID objects with binary matrices and TANG values
j1979_dictionary: Dictionary of known diagnostic signals
Phase 2: Lexical Analysis
Purpose
Identify continuous signals within CAN payloads by analyzing bit-level transition patterns.
Key Operations
Tokenization
Group adjacent bit positions into tokens (potential signals) based on TANG patterns.# From LexicalAnalysis.py
def get_composition(arb_id: ArbID, include_padding=False, max_inversion_distance: float = 0.0):
tokens = []
start_index = 0
currently_clustering = False
big_endian = True
for i, bit_position in enumerate(nditer(arb_id.tang)):
# Greedy clustering logic to identify contiguous signals
if bit_position <= 0.000001:
arb_id.padding.append(i) # Mark as padding
# ... clustering logic ...
arb_id.tokenization = tokens
The algorithm detects both big-endian and little-endian signals by analyzing whether TANG values increase or decrease across adjacent bit positions.
Token Merging
Merge adjacent tokens with similar transition frequencies.# From LexicalAnalysis.py
def merge_tokens(arb_id: ArbID, max_distance):
for i, token in enumerate(arb_id.tokenization):
if last and last[1] + 1 == token[0]: # Adjacent tokens?
if abs(arb_id.tang[last[1]] - arb_id.tang[token[0]]) <= max_distance:
# Merge into single token
token = (last[0], token[1])
arb_id.tokenization[i] = token
Signal Generation
Convert tokens into Signal objects with time series data.# From LexicalAnalysis.py
def generate_signals(a_timer, arb_id_dict, signal_pickle_filename, normalize_strategy, force=False):
signal_dict = {}
for k, arb_id in arb_id_dict.items():
if not arb_id.static:
for token in arb_id.tokenization:
signal = Signal(k, token[0], token[1])
# Convert binary matrix to integer time series
temp1 = [''.join(str(x) for x in row)
for row in arb_id.boolean_matrix[:, token[0]:token[1] + 1]]
temp2 = [int(row, 2) for row in temp1]
signal.time_series = Series(temp2, index=arb_id.original_data.index)
signal.normalize_and_set_metadata(normalize_strategy)
signal_dict[k][(arb_id.id, signal.start_index, signal.stop_index)] = signal
return signal_dict
Shannon Index Calculation
Measure signal entropy to identify dynamic signals.# From Signal.py
def set_shannon_index(self):
si: float = 0.0
n: int = self.time_series.__len__()
for count in self.time_series.value_counts():
p_i = count / n
si += p_i * log10(p_i)
si *= -1
self.shannon_index = si
The Shannon Index quantifies signal variability. Higher values indicate more dynamic signals with diverse values, while lower values suggest static or slowly-changing signals.
Output
signal_dictionary: Dictionary of Signal objects with normalized time series and Shannon Index values
Phase 3: Semantic Analysis
Purpose
Group related signals using correlation-based clustering to identify signals that represent the same physical phenomena.
Key Operations
Subset Selection
Select top 25% of signals with highest Shannon Index for initial clustering.# From SemanticAnalysis.py
def subset_selection(a_timer, signal_dict, subset_pickle, force, subset_size=0.25):
df.sort_values(by="Shannon_Index", inplace=True, ascending=False)
df = df.head(int(round(df.__len__() * subset_size, 0)))
# Create DataFrame with aligned time indices
for signal in subset:
subset_df[signal_id] = signal.time_series.reindex(index=largest_index,
method='nearest')
return subset_df
Subset selection reduces computational complexity while focusing on the most informative signals.
Correlation Matrix
Calculate Pearson correlation coefficient between all signal pairs.corr_matrix_subset = subset.corr()
Greedy Clustering
Group signals with correlation above threshold (default: 0.85).# From SemanticAnalysis.py
def greedy_signal_clustering(correlation_matrix, correlation_threshold=0.8, fuzzy_labeling=True):
cluster_dict = {}
previously_clustered_signals = {}
for n, row in enumerate(correlation_keys):
for m, col in enumerate(correlation_keys):
result = round(correlation_matrix.iloc[n, m], 2)
if result >= correlation_threshold:
# Clustering logic: create new clusters or add to existing ones
# Supports fuzzy labeling (signals in multiple clusters)
...
return cluster_dict
Fuzzy labeling allows signals to belong to multiple clusters, capturing cases where a signal correlates with multiple physical phenomena.
Label Propagation
Extend cluster labels from subset to all signals.# From SemanticAnalysis.py
def label_propagation(a_timer, signal_dict, cluster_dict, correlation_threshold, force):
# Calculate correlation matrix for ALL non-static signals
correlation_matrix = df.corr()
# Propagate labels without creating new clusters
for n, row in enumerate(correlation_keys):
for m, col in enumerate(correlation_keys):
if result >= correlation_threshold:
if row in previously_clustered_signals.keys():
cluster_dict[previously_clustered_signals[row]].append(col)
previously_clustered_signals[col] = previously_clustered_signals[row]
return df, correlation_matrix, cluster_dict
J1979 Labeling
Correlate proprietary signals with known J1979 diagnostic signals.# From SemanticAnalysis.py
def j1979_signal_labeling(a_timer, j1979_corr_filename, df_signals,
j1979_dict, signal_dict, correlation_threshold, force):
# Combine proprietary signals with J1979 signals
df_combined = concat([df_signals, df_j1979], axis=1)
correlation_matrix = df_combined.corr()
# Label proprietary signals with correlated J1979 PIDs
for index, row in correlation_matrix[df_columns][:-len(df_columns)].iterrows():
max_index = row.idxmax(axis=1, skipna=True)
if row[max_index] >= correlation_threshold:
signal.j1979_title = max_index
signal.j1979_pcc = row[max_index]
Signals that correlate highly with J1979 data (e.g., r > 0.85) can be automatically labeled with human-readable names like “Engine RPM” or “Vehicle Speed”.
Output
cluster_dict: Dictionary mapping cluster IDs to lists of signal IDs
corr_matrix_full: Complete correlation matrix for all signals
- Labeled signals: Signals with J1979 correlations identified
Data Flow Diagram
Raw CAN Log (.log file)
|
v
[Pre-Processing]
- CSV Import
- Arb ID Grouping
- J1979 Detection
- Binary Matrix
- TANG Calculation
- Frequency Analysis
|
v
ArbID Dictionary
|
v
[Lexical Analysis]
- Tokenization
- Token Merging
- Signal Generation
- Shannon Index
|
v
Signal Dictionary
|
v
[Semantic Analysis]
- Subset Selection
- Correlation Matrix
- Greedy Clustering
- Label Propagation
- J1979 Labeling
|
v
Clustered Signals + Labels
Runtime Considerations
Caching Strategy
The pipeline uses pickle files to cache intermediate results:
# From Main.py
pickle_arb_id_filename = 'pickleArbIDs.p'
pickle_signal_filename = 'pickleSignals.p'
pickle_clusters_filename = 'pickleClusters.p'
# Skip expensive recomputation if cached
if path.isfile(pickle_arb_id_filename) and not force:
return load(open(pickle_arb_id_filename, "rb"))
Set force=True flags to recompute specific phases when testing parameter changes.
The PipelineTimer class tracks execution time for each phase:
a_timer = PipelineTimer(verbose=True)
a_timer.start_function_time()
# ... processing ...
a_timer.set_arb_id_creation()
Next Steps