Skip to main content
The Consumer class receives and processes messages (signals) from your digital twin via Kafka topics. It provides handler-based message processing and utilities for state data conversion.

Class Definition

Location: avenieca/consumer.py:7
class Consumer:
    def __init__(self, config: Broker)
Parameters:
  • config (Broker): Broker configuration object

Methods

consume

Signature: avenieca/consumer.py:24
def consume(self, func, sync_once=False)
Start consuming messages from the configured Kafka topic. Parameters:
  • func: Handler function to process received messages
  • sync_once (optional): If True, processes only one message and exits (default: False)
Returns: None

Basic Usage

Simple Consumer

import os
from avenieca.config.broker import Broker
from avenieca.consumer import Consumer

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="",  # Not used for consuming
    pub_topic="output",  # Topic to consume from
    group="test",
    auto_offset_reset="latest"
)

# Define a handler to process incoming messages
def handler(data):
    valence = data["valence"]
    state = data["state"]
    print(f"Received - Valence: {valence}, State: {state}")

consumer = Consumer(config=config)
consumer.consume(handler)  # Runs continuously

Consume Once

Process a single message and exit:
from avenieca.consumer import Consumer
from avenieca.config.broker import Broker

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="output",
    group="test"
)

def handler(data):
    print(f"Received: {data}")

consumer = Consumer(config=config)
consumer.consume(handler, sync_once=True)  # Processes one message

Message Format

Messages received by the handler are dictionaries with the following structure:
{
    "valence": 10.0,
    "state": "[0.2, 0.3, 0.8]",  # String representation
    "score": 5,
    "emb_inp": 1
}
Note: The state field is received as a JSON string and needs to be converted to use as a list or numpy array.

Signal Utilities

AveniECA provides utility functions to convert the state signal from its string representation.

get_state_as_list

Location: avenieca/utils/signal.py:16
from avenieca.utils.signal import get_state_as_list

def get_state_as_list(signal, dtype=np.float64)
Converts the state field from a JSON string to a Python list. Example:
import os
from avenieca.config.broker import Broker
from avenieca.consumer import Consumer
from avenieca.utils.signal import get_state_as_list

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="",
    pub_topic="output",
    group="test"
)

def handler(data):
    print(f"Before: {data['state']}")  # "[0.2, 0.3, 0.8]"
    print(f"Type: {type(data['state'])}")  # <class 'str'>
    
    get_state_as_list(data)  # Modifies data in-place
    
    print(f"After: {data['state']}")  # [0.2, 0.3, 0.8]
    print(f"Type: {type(data['state'])}")  # <class 'list'>

consumer = Consumer(config=config)
consumer.consume(handler)

get_state_as_array

Location: avenieca/utils/signal.py:8
from avenieca.utils.signal import get_state_as_array

def get_state_as_array(signal, dtype=np.float64)
Converts the state field from a JSON string to a numpy ndarray. Example:
import numpy as np
from avenieca.config.broker import Broker
from avenieca.consumer import Consumer
from avenieca.utils.signal import get_state_as_array

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="output",
    group="test"
)

def handler(data):
    print(f"Before: {data['state']}")  # "[0.2, 0.3, 0.8]"
    
    get_state_as_array(data)  # Modifies data in-place
    
    print(f"After: {data['state']}")  # numpy array
    print(f"Type: {type(data['state'])}")  # <class 'numpy.ndarray'>
    
    # Now you can use numpy operations
    mean = data['state'].mean()
    print(f"Mean: {mean}")

consumer = Consumer(config=config)
consumer.consume(handler)

Complete Example from README

import os
import numpy as np
from avenieca.config.broker import Broker
from avenieca.data import Signal
from avenieca.utils.signal import get_state_as_list, get_state_as_array
from avenieca.consumer import Consumer

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="left_wheel",
    group="test",
    pub_topic="output"
)

signal = Signal(
    valence=9.0,
    state=[0.2, 0.3, 0.8]
)

# Basic handler
def handler(data):
    valence = data["valence"]
    state = data["state"]
    assert valence == 10
    assert state == "[0.2, 0.3, 0.8]"

client = Consumer(config=config)
client.consume(handler, True)  # Pass in handler

# Handler with get_state_as_list
def handler1(data):
    assert data["valence"] == 10
    assert data["state"] == "[0.2, 0.3, 0.8]"
    get_state_as_list(data)
    assert data["state"] == [0.2, 0.3, 0.8]

# Handler with get_state_as_array
def handler2(data):
    assert data["valence"] == 10
    assert data["state"] == "[0.2, 0.3, 0.8]"
    get_state_as_array(data)
    assert True, np.array_equal(data["state"], np.array([0.2, 0.3, 0.8]))

Advanced Examples

Processing with Multiple Fields

from avenieca.consumer import Consumer
from avenieca.config.broker import Broker
from avenieca.utils.signal import get_state_as_list

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="sensor_data",
    group="processors"
)

def process_sensor_data(data):
    # Extract all fields
    valence = data.get("valence")
    score = data.get("score")
    emb_inp = data.get("emb_inp")
    
    # Convert state to list
    get_state_as_list(data)
    state = data["state"]
    
    # Process the data
    print(f"Valence: {valence}")
    print(f"Score: {score}")
    print(f"Embedding Input: {emb_inp}")
    print(f"State vector: {state}")
    print(f"State dimension: {len(state)}")

consumer = Consumer(config=config)
consumer.consume(process_sensor_data)

Data Aggregation

import numpy as np
from avenieca.consumer import Consumer
from avenieca.config.broker import Broker
from avenieca.utils.signal import get_state_as_array

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="metrics",
    group="aggregator"
)

class DataAggregator:
    def __init__(self):
        self.values = []
        self.count = 0
    
    def process(self, data):
        get_state_as_array(data)
        
        self.values.append(data["state"])
        self.count += 1
        
        if self.count % 10 == 0:
            # Calculate statistics every 10 messages
            all_values = np.array(self.values)
            mean = all_values.mean(axis=0)
            std = all_values.std(axis=0)
            
            print(f"\nStatistics after {self.count} messages:")
            print(f"Mean: {mean}")
            print(f"Std Dev: {std}")

aggregator = DataAggregator()
consumer = Consumer(config=config)
consumer.consume(aggregator.process)

Filtering and Routing

from avenieca.consumer import Consumer
from avenieca.config.broker import Broker
from avenieca.utils.signal import get_state_as_list

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="all_events",
    group="router"
)

def route_message(data):
    """Route messages based on valence"""
    valence = data.get("valence", 0)
    get_state_as_list(data)
    
    if valence > 50:
        handle_high_priority(data)
    elif valence < -50:
        handle_alert(data)
    else:
        handle_normal(data)

def handle_high_priority(data):
    print(f"HIGH PRIORITY: {data}")

def handle_alert(data):
    print(f"ALERT: {data}")

def handle_normal(data):
    print(f"Normal: {data}")

consumer = Consumer(config=config)
consumer.consume(route_message)

Error Handling

import logging
from avenieca.consumer import Consumer
from avenieca.config.broker import Broker
from avenieca.utils.signal import get_state_as_list

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="data_stream",
    group="safe_consumer"
)

def safe_handler(data):
    """Handler with error handling"""
    try:
        # Validate data
        if "state" not in data:
            logger.error("Missing state field")
            return
        
        if "valence" not in data:
            logger.warning("Missing valence field")
        
        # Process data
        get_state_as_list(data)
        
        # Your processing logic
        process_data(data)
        
        logger.info(f"Successfully processed message")
        
    except Exception as e:
        logger.error(f"Error processing message: {e}")

def process_data(data):
    # Your actual processing logic
    pass

consumer = Consumer(config=config)
consumer.consume(safe_handler)

Custom Data Types

import numpy as np
from avenieca.consumer import Consumer
from avenieca.config.broker import Broker
from avenieca.utils.signal import get_state_as_array

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="typed_data",
    group="typed_consumer"
)

def handler_with_dtype(data):
    """Convert state to specific numpy dtype"""
    # Convert to int32
    get_state_as_array(data, dtype=np.int32)
    print(f"State as int32: {data['state']}")
    print(f"Dtype: {data['state'].dtype}")

consumer = Consumer(config=config)
consumer.consume(handler_with_dtype)

Handler Function Requirements

Your handler function must:
  1. Accept a single parameter (the message data dictionary)
  2. Handle the message data appropriately
  3. Use utility functions to convert state if needed
Valid handler:
def valid_handler(data):
    print(data["valence"])
    get_state_as_list(data)
    print(data["state"])
Invalid handler (no parameter):
def invalid_handler():
    print("No data access")  # Error!

Configuration Notes

Consumer Group

The group parameter in the Broker config identifies the consumer group:
  • Multiple consumers with the same group share message processing
  • Each message is delivered to only one consumer in the group
  • Different groups receive all messages independently

Auto Offset Reset

The auto_offset_reset parameter controls where consumption starts:
  • "latest" (default): Start from newest messages
  • "earliest": Start from oldest available messages
config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="topic",
    group="my_group",
    auto_offset_reset="earliest"  # Start from beginning
)

Build docs developers (and LLMs) love