Skip to main content

Overview

Lexical analysis is the second stage of the pipeline, responsible for identifying tokens (contiguous bit positions likely belonging to the same signal). This process uses the TANG values from preprocessing to cluster bit positions based on their transition frequency patterns. The output of this phase is a collection of Signal objects, each representing a time series extracted from specific bit positions.

Key Responsibilities

1

Bit Position Clustering

Group adjacent bit positions with similar TANG values into tokens
2

Endian Detection

Automatically detect big-endian vs little-endian signal encoding
3

Token Merging

Combine adjacent tokens when their boundary TANG values are similar
4

Signal Generation

Extract time series data for each token and convert to integer values
5

Shannon Index Calculation

Measure signal entropy to identify static vs dynamic signals
6

Normalization

Apply normalization strategy to make signals comparable

Tokenization Algorithm

The get_composition() function (LexicalAnalysis.py:35-88) implements a greedy clustering algorithm.

Core Logic

def get_composition(arb_id: ArbID, 
                    include_padding=False, 
                    max_inversion_distance: float = 0.0):
    tokens = []
    start_index = 0
    currently_clustering = False
    big_endian = True
    last_bit_position = 0
    
    for i, bit_position in enumerate(nditer(arb_id.tang)):
        # Check if this is a padding bit (TANG ≈ 0)
        if bit_position <= 0.000001:
            arb_id.padding.append(i)
            if not include_padding:
                if currently_clustering:
                    tokens.append((start_index, i - 1))
                    currently_clustering = False
                continue
        
        # Continue clustering or start new token based on TANG monotonicity
        if currently_clustering:
            # Check if TANG continues in same direction (endian-aware)
            if bit_position >= last_bit_position and big_endian:
                pass  # Continue token
            elif bit_position <= last_bit_position and not big_endian:
                pass  # Continue token
            # Allow small inversions within threshold
            elif abs(bit_position - last_bit_position) <= max_inversion_distance:
                pass  # Continue token
            # Second bit position: establish endianness
            elif start_index == i - 1:
                big_endian = (bit_position >= last_bit_position)
            # Unacceptable inversion: save token and start new one
            else:
                tokens.append((start_index, i - 1))
                start_index = i
        else:
            currently_clustering = True
            start_index = i
        
        last_bit_position = bit_position
    
    # Save final token if still clustering
    if currently_clustering:
        tokens.append((start_index, arb_id.tang.__len__() - 1))
    
    arb_id.tokenization = tokens

Algorithm Behavior

# TANG values: [0.1, 0.3, 0.5, 0.7, 0.9]
# Monotonically increasing → Big Endian token
# Result: [(0, 4)]
Endianness is detected automatically by observing whether TANG values increase or decrease. The algorithm assumes CAN signals use either big-endian (most significant bit first) or little-endian (least significant bit first) encoding.

Padding Detection

Bits with TANG values ≤ 0.000001 are classified as padding bits (LexicalAnalysis.py:46-56).
if bit_position <= 0.000001:
    arb_id.padding.append(i)
    if not include_padding:
        # End current token and skip this bit
        if currently_clustering:
            tokens.append((start_index, i - 1))
            currently_clustering = False
        continue
By default, padding bits are not included in tokens. Set include_padding=True to force tokenization of all bit positions, including static ones.

Token Merging

The merge_tokens() function (LexicalAnalysis.py:91-128) combines adjacent tokens if their boundary TANG values are similar.

Merging Strategy

def merge_tokens(arb_id: ArbID, max_distance):
    remove_list = []
    last = None
    
    for i, token in enumerate(arb_id.tokenization):
        if last:
            # Check if tokens are adjacent
            if last[1] + 1 == token[0]:
                # Check if boundary TANG values are within threshold
                if abs(arb_id.tang[last[1]] - arb_id.tang[token[0]]) <= max_distance:
                    # Merge tokens
                    remove_list.append(last)
                    token = (last[0], token[1])
                    arb_id.tokenization[i] = token
        last = token
    
    # Remove merged tokens
    for token in remove_list:
        arb_id.tokenization.remove(token)

Example

# Before merging:
tokens = [(0, 3), (4, 7), (8, 15)]
tang = [0.5, 0.6, 0.7, 0.75, 0.78, 0.8, 0.85, 0.9, 0.1, ...]

# With max_distance = 0.1:
# Token boundary (3→4): |0.75 - 0.78| = 0.03 ≤ 0.1 → MERGE
# Token boundary (7→8): |0.9 - 0.1| = 0.8 > 0.1 → DON'T MERGE

# After merging:
tokens = [(0, 7), (8, 15)]
Token merging helps combine signals that were artificially split due to minor TANG fluctuations. Typical values for max_distance are 0.1-0.3.

Signal Generation

The generate_signals() function (LexicalAnalysis.py:132-177) converts each token into a Signal object.

Process

for k, arb_id in arb_id_dict.items():
    if not arb_id.static:
        for token in arb_id.tokenization:
            signal = Signal(k, token[0], token[1])
            
            # Extract bit columns for this token
            temp1 = [''.join(str(x) for x in row) 
                     for row in arb_id.boolean_matrix[:, token[0]:token[1]+1]]
            
            temp2 = zeros((temp1.__len__(), 1), dtype=uint64)
            
            # Convert binary strings to integers
            for i, row in enumerate(temp1):
                temp2[i] = int(row, 2)
            
            # Create time series with original timestamp index
            signal.time_series = Series(
                temp2[:, 0], 
                index=arb_id.original_data.index, 
                dtype=float64
            )
            
            # Normalize and calculate metadata
            signal.normalize_and_set_metadata(normalize_strategy)
            
            # Add to signal dictionary
            signal_dict[k][(arb_id.id, signal.start_index, signal.stop_index)] = signal

Binary to Integer Conversion

Each row of the binary matrix is converted to an unsigned integer:
# Example: 8-bit token from bits 16-23
binary_row = [0, 1, 0, 1, 1, 0, 1, 0]  # Binary: 01011010
binary_string = '01011010'
integer_value = int(binary_string, 2)  # Decimal: 90
All signals are converted to unsigned integers. If your CAN signals use signed encoding (two’s complement), you’ll need to apply sign conversion separately.

Shannon Index

The Shannon Index measures signal entropy, indicating how “random” or “structured” the signal values are.

Calculation

From Signal.py:23-32:
def set_shannon_index(self):
    si: float = 0.0
    n: int = self.time_series.__len__()
    
    for count in self.time_series.value_counts():
        # Proportion of this value in the population
        p_i = count / n
        # Shannon Index contribution
        si += p_i * log10(p_i)
    
    si *= -1
    self.shannon_index = si

Interpretation

  • High Shannon Index (> 1.0): Many unique values, high entropy → likely a meaningful signal
  • Low Shannon Index (< 0.5): Few unique values, low entropy → possibly static or stepwise signal
  • Zero Shannon Index: Only one unique value → static signal
Signals with Shannon Index < 0.000001 are automatically marked as static=True (Signal.py:35-36).

Configuration Parameters

From Main.py:70-72:
tokenization_bit_distance
float
default:"0.2"
Maximum allowable TANG inversion within a token and threshold for merging adjacent tokens
  • Higher values: More tolerant of TANG fluctuations, produces longer tokens
  • Lower values: Stricter clustering, produces shorter tokens
  • Typical range: 0.1 - 0.3
tokenize_padding
bool
default:"true"
Whether to include padding bits (TANG ≈ 0) in tokenization
  • false: Padding bits are skipped (recommended for most cases)
  • true: All bits are tokenized, including static ones
merge
bool
default:"true"
Whether to run the token merging phase
  • true: Adjacent tokens with similar boundary TANG values are merged
  • false: Keep initial tokenization without merging
force
bool
default:"false"
Whether to regenerate signals even if cached data exists

Usage Example

From Main.py:96-109:
from LexicalAnalysis import tokenize_dictionary, generate_signals
from sklearn.preprocessing import minmax_scale

# Configuration
tokenization_bit_distance = 0.2
tokenize_padding = False
signal_normalize_strategy = minmax_scale

# Step 1: Tokenize all Arbitration IDs
tokenize_dictionary(
    a_timer,
    id_dictionary,
    force=False,
    include_padding=tokenize_padding,
    merge=True,
    max_distance=tokenization_bit_distance
)

# Step 2: Generate Signal objects from tokens
signal_dictionary = generate_signals(
    a_timer,
    id_dictionary,
    pickle_signal_filename="pickleSignals.p",
    normalize_strategy=signal_normalize_strategy,
    force=False
)

Output Data Structure

Signal Dictionary

Nested dictionary structure:
{
    arb_id_1: {
        (arb_id_1, start_1, stop_1): Signal_object_1,
        (arb_id_1, start_2, stop_2): Signal_object_2,
        ...
    },
    arb_id_2: {
        (arb_id_2, start_1, stop_1): Signal_object_3,
        ...
    },
    ...
}
Outer key: Arbitration ID (int)
Inner key: Tuple of (arb_id, start_bit, stop_bit)
Value: Signal object

Signal Object Attributes

From Signal.py:6-15:
  • arb_id: Arbitration ID (int)
  • start_index: Starting bit position (int)
  • stop_index: Ending bit position (int)
  • time_series: Pandas Series with timestamp index
  • static: Boolean (True if Shannon Index ≈ 0)
  • shannon_index: Entropy measure (float)
  • plot_title: Auto-generated description (str)
  • j1979_title: Diagnostic label if correlated with J1979 (str, initially None)
  • j1979_pcc: Pearson correlation coefficient with J1979 signal (float)

Advanced: Custom Tokenization

You can implement custom tokenization logic by modifying get_composition():
# Example: Fixed-width tokenization (every 8 bits)
def fixed_width_tokenization(arb_id: ArbID, width: int = 8):
    tokens = []
    num_bits = arb_id.tang.__len__()
    
    for start in range(0, num_bits, width):
        stop = min(start + width - 1, num_bits - 1)
        tokens.append((start, stop))
    
    arb_id.tokenization = tokens
Custom tokenization strategies can be useful for known signal layouts or proprietary CAN protocols.

Performance Considerations

  • Tokenization: O(n) where n = number of bits per Arbitration ID
  • Merging: O(t) where t = number of tokens
  • Signal generation: O(n × m) where n = number of tokens, m = number of messages
The pipeline uses pickle caching to avoid recomputing signals. Delete .p files to force regeneration.

Debugging Tips

View Tokenization Results

for arb_id_num, arb_id in id_dictionary.items():
    print(f"Arb ID {hex(arb_id_num)}:")
    print(f"  Tokens: {arb_id.tokenization}")
    print(f"  Padding bits: {arb_id.padding}")

Check Signal Statistics

for arb_id, signals in signal_dictionary.items():
    for signal_id, signal in signals.items():
        print(f"{signal.plot_title}")
        print(f"  Shannon Index: {signal.shannon_index:.4f}")
        print(f"  Static: {signal.static}")
        print(f"  Unique values: {signal.time_series.nunique()}")

See Also

Build docs developers (and LLMs) love