Lexical analysis is the second stage of the pipeline, responsible for identifying tokens (contiguous bit positions likely belonging to the same signal). This process uses the TANG values from preprocessing to cluster bit positions based on their transition frequency patterns.The output of this phase is a collection of Signal objects, each representing a time series extracted from specific bit positions.
def get_composition(arb_id: ArbID, include_padding=False, max_inversion_distance: float = 0.0): tokens = [] start_index = 0 currently_clustering = False big_endian = True last_bit_position = 0 for i, bit_position in enumerate(nditer(arb_id.tang)): # Check if this is a padding bit (TANG ≈ 0) if bit_position <= 0.000001: arb_id.padding.append(i) if not include_padding: if currently_clustering: tokens.append((start_index, i - 1)) currently_clustering = False continue # Continue clustering or start new token based on TANG monotonicity if currently_clustering: # Check if TANG continues in same direction (endian-aware) if bit_position >= last_bit_position and big_endian: pass # Continue token elif bit_position <= last_bit_position and not big_endian: pass # Continue token # Allow small inversions within threshold elif abs(bit_position - last_bit_position) <= max_inversion_distance: pass # Continue token # Second bit position: establish endianness elif start_index == i - 1: big_endian = (bit_position >= last_bit_position) # Unacceptable inversion: save token and start new one else: tokens.append((start_index, i - 1)) start_index = i else: currently_clustering = True start_index = i last_bit_position = bit_position # Save final token if still clustering if currently_clustering: tokens.append((start_index, arb_id.tang.__len__() - 1)) arb_id.tokenization = tokens
Endianness is detected automatically by observing whether TANG values increase or decrease. The algorithm assumes CAN signals use either big-endian (most significant bit first) or little-endian (least significant bit first) encoding.
Bits with TANG values ≤ 0.000001 are classified as padding bits (LexicalAnalysis.py:46-56).
if bit_position <= 0.000001: arb_id.padding.append(i) if not include_padding: # End current token and skip this bit if currently_clustering: tokens.append((start_index, i - 1)) currently_clustering = False continue
By default, padding bits are not included in tokens. Set include_padding=True to force tokenization of all bit positions, including static ones.
def merge_tokens(arb_id: ArbID, max_distance): remove_list = [] last = None for i, token in enumerate(arb_id.tokenization): if last: # Check if tokens are adjacent if last[1] + 1 == token[0]: # Check if boundary TANG values are within threshold if abs(arb_id.tang[last[1]] - arb_id.tang[token[0]]) <= max_distance: # Merge tokens remove_list.append(last) token = (last[0], token[1]) arb_id.tokenization[i] = token last = token # Remove merged tokens for token in remove_list: arb_id.tokenization.remove(token)
for k, arb_id in arb_id_dict.items(): if not arb_id.static: for token in arb_id.tokenization: signal = Signal(k, token[0], token[1]) # Extract bit columns for this token temp1 = [''.join(str(x) for x in row) for row in arb_id.boolean_matrix[:, token[0]:token[1]+1]] temp2 = zeros((temp1.__len__(), 1), dtype=uint64) # Convert binary strings to integers for i, row in enumerate(temp1): temp2[i] = int(row, 2) # Create time series with original timestamp index signal.time_series = Series( temp2[:, 0], index=arb_id.original_data.index, dtype=float64 ) # Normalize and calculate metadata signal.normalize_and_set_metadata(normalize_strategy) # Add to signal dictionary signal_dict[k][(arb_id.id, signal.start_index, signal.stop_index)] = signal
All signals are converted to unsigned integers. If your CAN signals use signed encoding (two’s complement), you’ll need to apply sign conversion separately.
def set_shannon_index(self): si: float = 0.0 n: int = self.time_series.__len__() for count in self.time_series.value_counts(): # Proportion of this value in the population p_i = count / n # Shannon Index contribution si += p_i * log10(p_i) si *= -1 self.shannon_index = si
for arb_id_num, arb_id in id_dictionary.items(): print(f"Arb ID {hex(arb_id_num)}:") print(f" Tokens: {arb_id.tokenization}") print(f" Padding bits: {arb_id.padding}")
for arb_id, signals in signal_dictionary.items(): for signal_id, signal in signals.items(): print(f"{signal.plot_title}") print(f" Shannon Index: {signal.shannon_index:.4f}") print(f" Static: {signal.static}") print(f" Unique values: {signal.time_series.nunique()}")