Semantic analysis is the third stage of the pipeline, responsible for identifying relationships between signals and grouping correlated signals into clusters. This phase uses Pearson correlation to measure signal similarity and implements a greedy clustering algorithm to group related signals.The output is a dictionary of signal clusters, where each cluster represents signals that likely measure the same or related physical quantities.
def subset_selection(a_timer: PipelineTimer, signal_dict: dict = None, subset_pickle: str = "", force: bool = False, subset_size: float = 0.25) -> DataFrame: # Count non-static signals signal_index = 0 for k_arb_id, arb_id_signals in signal_dict.items(): for k_signal_id, signal in arb_id_signals.items(): if not signal.static: signal_index += 1 # Create DataFrame with signal metadata df: DataFrame = DataFrame( zeros((signal_index, 4)), columns=["arb_id", "start_index", "stop_index", "Shannon_Index"] ) # Populate DataFrame for i, (k_arb_id, arb_id_signals) in enumerate(signal_dict.items()): for j, (k_signal_id, signal) in enumerate(arb_id_signals.items()): if not signal.static: df.iloc[signal_index-1] = [ k_arb_id, signal.start_index, signal.stop_index, signal.shannon_index ] signal_index -= 1 # Sort by Shannon Index (descending) df.sort_values(by="Shannon_Index", inplace=True, ascending=False) # Select top X% (default 25%) df = df.head(int(round(df.__len__() * subset_size, 0))) # Re-index all signals to share common timestamp index # (uses signal with most samples as reference) # ... [code continues]
def greedy_signal_clustering(correlation_matrix: DataFrame = None, correlation_threshold: float = 0.8, fuzzy_labeling: bool = True) -> dict: correlation_keys = correlation_matrix.columns.values previously_clustered_signals = {} # signal_id → cluster_id(s) cluster_dict = {} # cluster_id → [signal_ids] new_cluster_label = 0 for n, row in enumerate(correlation_keys): for m, col in enumerate(correlation_keys): if n == m: continue # Skip diagonal result = round(correlation_matrix.iloc[n, m], 2) # Check if correlation exceeds threshold if result >= correlation_threshold: # Case 1: Both signals unlabeled → Create new cluster if row not in previously_clustered_signals and \ col not in previously_clustered_signals: cluster_dict[new_cluster_label] = [row, col] previously_clustered_signals[row] = {new_cluster_label} previously_clustered_signals[col] = {new_cluster_label} new_cluster_label += 1 # Case 2: Row unlabeled, col labeled → Add row to col's clusters elif row not in previously_clustered_signals: for label in previously_clustered_signals[col]: cluster_dict[label].append(row) previously_clustered_signals[row] = previously_clustered_signals[col] # Case 3: Col unlabeled, row labeled → Add col to row's clusters elif col not in previously_clustered_signals: for label in previously_clustered_signals[row]: cluster_dict[label].append(col) previously_clustered_signals[col] = previously_clustered_signals[row] # Case 4: Both labeled (fuzzy labeling mode) else: if fuzzy_labeling: row_label_set = previously_clustered_signals[row] col_label_set = previously_clustered_signals[col] if not row_label_set & col_label_set: # No overlap # Create bridge cluster cluster_dict[new_cluster_label] = [row, col] previously_clustered_signals[row] = {new_cluster_label} | row_label_set previously_clustered_signals[col] = {new_cluster_label} | col_label_set new_cluster_label += 1 # Remove duplicate clusters # ... [deduplication logic] return cluster_dict
# Correlation matrix:# A B C# A 1.00 0.90 0.10# B 0.90 1.00 0.15# C 0.10 0.15 1.00# With threshold = 0.8:# A ↔ B correlation = 0.90 → Create cluster 0: [A, B]# C has no high correlations → Not clusteredcluster_dict = { 0: [A, B]}
true: Signals can be in multiple clusters (represents complex relationships)
false: Each signal belongs to at most one cluster
Fuzzy labeling is useful when signals have multiple physical interpretations. For example, engine RPM might correlate with both vehicle speed AND throttle position.
def label_propagation(a_timer: PipelineTimer, pickle_clusters_filename: str = '', pickle_all_signals_df_filename: str = '', csv_signals_correlation_filename: str = '', signal_dict: dict = None, cluster_dict: dict = None, correlation_threshold: float = 0.8, force: bool = False): # Create DataFrame with ALL non-static signals non_static_signals_dict = {} for k_arb_id, arb_id_signals in signal_dict.items(): for k_signal_id, signal in arb_id_signals.items(): if not signal.static: non_static_signals_dict[k_signal_id] = signal # Re-index to common timestamp index df: DataFrame = DataFrame(...) # [similar to subset_selection] # Calculate correlation matrix for ALL signals correlation_matrix = df.corr() # Initialize with existing cluster assignments previously_clustered_signals = {} for k_cluster_id, cluster in cluster_dict.items(): for k_signal_id in cluster: previously_clustered_signals[k_signal_id] = k_cluster_id # Propagate labels to unclustered signals for n, row in enumerate(correlation_keys): for m, col in enumerate(correlation_keys): if n == m: continue result = round(correlation_matrix.iloc[n, m], 2) if result >= correlation_threshold: # If row is clustered but col is not, add col to row's cluster if row in previously_clustered_signals and \ col not in previously_clustered_signals: cluster_dict[previously_clustered_signals[row]].append(col) previously_clustered_signals[col] = previously_clustered_signals[row] # If col is clustered but row is not, add row to col's cluster elif col in previously_clustered_signals and \ row not in previously_clustered_signals: cluster_dict[previously_clustered_signals[col]].append(row) previously_clustered_signals[row] = previously_clustered_signals[col] return df, correlation_matrix, cluster_dict
Label propagation creates a correlation matrix for all signals, which can be memory-intensive for large datasets. Consider increasing subset size if propagation discovers few new labels.
The j1979_signal_labeling() function (SemanticAnalysis.py:277-334) correlates CAN signals with J1979 diagnostic data to automatically identify known quantities.
# Signal from Arb ID 0x123, bits 16-23signal.j1979_title = "Engine RPM"signal.j1979_pcc = 0.94# Interpretation: This signal has 94% correlation with J1979 Engine RPM# It likely represents engine speed encoded in the CAN payload
J1979 labeling uses absolute correlation (abs(row)) to catch both positive and negative relationships. Some signals may be inversely correlated with J1979 data.
for cluster_id, signal_ids in cluster_dict.items(): print(f"\nCluster {cluster_id}:") for signal_id in signal_ids: signal = signal_dict[signal_id[0]][signal_id] print(f" {signal.plot_title}") if signal.j1979_title: print(f" → {signal.j1979_title} (PCC: {signal.j1979_pcc:.2f})")
# Find highest correlations for a specific signalsignal_id = (123, 0, 7)correlations = corr_matrix_full.loc[signal_id].sort_values(ascending=False)print(correlations.head(10))
for arb_id, signals in signal_dict.items(): for signal_id, signal in signals.items(): if signal.j1979_title: print(f"{signal.plot_title} → {signal.j1979_title} (r={signal.j1979_pcc:.2f})")