Documentation Index
Fetch the complete documentation index at: https://mintlify.com/aveni-hub/avenieca-python/llms.txt
Use this file to discover all available pages before exploring further.
The Consumer class receives and processes messages (signals) from your digital twin via Kafka topics. It provides handler-based message processing and utilities for state data conversion.
Class Definition
Location: avenieca/consumer.py:7
class Consumer:
def __init__(self, config: Broker)
Parameters:
config (Broker): Broker configuration object
Methods
consume
Signature: avenieca/consumer.py:24
def consume(self, func, sync_once=False)
Start consuming messages from the configured Kafka topic.
Parameters:
func: Handler function to process received messages
sync_once (optional): If True, processes only one message and exits (default: False)
Returns: None
Basic Usage
Simple Consumer
import os
from avenieca.config.broker import Broker
from avenieca.consumer import Consumer
config = Broker(
url=os.environ["KAFKA_URL"],
sub_topic="", # Not used for consuming
pub_topic="output", # Topic to consume from
group="test",
auto_offset_reset="latest"
)
# Define a handler to process incoming messages
def handler(data):
valence = data["valence"]
state = data["state"]
print(f"Received - Valence: {valence}, State: {state}")
consumer = Consumer(config=config)
consumer.consume(handler) # Runs continuously
Consume Once
Process a single message and exit:
from avenieca.consumer import Consumer
from avenieca.config.broker import Broker
config = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="output",
group="test"
)
def handler(data):
print(f"Received: {data}")
consumer = Consumer(config=config)
consumer.consume(handler, sync_once=True) # Processes one message
Messages received by the handler are dictionaries with the following structure:
{
"valence": 10.0,
"state": "[0.2, 0.3, 0.8]", # String representation
"score": 5,
"emb_inp": 1
}
Note: The state field is received as a JSON string and needs to be converted to use as a list or numpy array.
Signal Utilities
AveniECA provides utility functions to convert the state signal from its string representation.
get_state_as_list
Location: avenieca/utils/signal.py:16
from avenieca.utils.signal import get_state_as_list
def get_state_as_list(signal, dtype=np.float64)
Converts the state field from a JSON string to a Python list.
Example:
import os
from avenieca.config.broker import Broker
from avenieca.consumer import Consumer
from avenieca.utils.signal import get_state_as_list
config = Broker(
url=os.environ["KAFKA_URL"],
sub_topic="",
pub_topic="output",
group="test"
)
def handler(data):
print(f"Before: {data['state']}") # "[0.2, 0.3, 0.8]"
print(f"Type: {type(data['state'])}") # <class 'str'>
get_state_as_list(data) # Modifies data in-place
print(f"After: {data['state']}") # [0.2, 0.3, 0.8]
print(f"Type: {type(data['state'])}") # <class 'list'>
consumer = Consumer(config=config)
consumer.consume(handler)
get_state_as_array
Location: avenieca/utils/signal.py:8
from avenieca.utils.signal import get_state_as_array
def get_state_as_array(signal, dtype=np.float64)
Converts the state field from a JSON string to a numpy ndarray.
Example:
import numpy as np
from avenieca.config.broker import Broker
from avenieca.consumer import Consumer
from avenieca.utils.signal import get_state_as_array
config = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="output",
group="test"
)
def handler(data):
print(f"Before: {data['state']}") # "[0.2, 0.3, 0.8]"
get_state_as_array(data) # Modifies data in-place
print(f"After: {data['state']}") # numpy array
print(f"Type: {type(data['state'])}") # <class 'numpy.ndarray'>
# Now you can use numpy operations
mean = data['state'].mean()
print(f"Mean: {mean}")
consumer = Consumer(config=config)
consumer.consume(handler)
Complete Example from README
import os
import numpy as np
from avenieca.config.broker import Broker
from avenieca.data import Signal
from avenieca.utils.signal import get_state_as_list, get_state_as_array
from avenieca.consumer import Consumer
config = Broker(
url=os.environ["KAFKA_URL"],
sub_topic="left_wheel",
group="test",
pub_topic="output"
)
signal = Signal(
valence=9.0,
state=[0.2, 0.3, 0.8]
)
# Basic handler
def handler(data):
valence = data["valence"]
state = data["state"]
assert valence == 10
assert state == "[0.2, 0.3, 0.8]"
client = Consumer(config=config)
client.consume(handler, True) # Pass in handler
# Handler with get_state_as_list
def handler1(data):
assert data["valence"] == 10
assert data["state"] == "[0.2, 0.3, 0.8]"
get_state_as_list(data)
assert data["state"] == [0.2, 0.3, 0.8]
# Handler with get_state_as_array
def handler2(data):
assert data["valence"] == 10
assert data["state"] == "[0.2, 0.3, 0.8]"
get_state_as_array(data)
assert True, np.array_equal(data["state"], np.array([0.2, 0.3, 0.8]))
Advanced Examples
Processing with Multiple Fields
from avenieca.consumer import Consumer
from avenieca.config.broker import Broker
from avenieca.utils.signal import get_state_as_list
config = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="sensor_data",
group="processors"
)
def process_sensor_data(data):
# Extract all fields
valence = data.get("valence")
score = data.get("score")
emb_inp = data.get("emb_inp")
# Convert state to list
get_state_as_list(data)
state = data["state"]
# Process the data
print(f"Valence: {valence}")
print(f"Score: {score}")
print(f"Embedding Input: {emb_inp}")
print(f"State vector: {state}")
print(f"State dimension: {len(state)}")
consumer = Consumer(config=config)
consumer.consume(process_sensor_data)
Data Aggregation
import numpy as np
from avenieca.consumer import Consumer
from avenieca.config.broker import Broker
from avenieca.utils.signal import get_state_as_array
config = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="metrics",
group="aggregator"
)
class DataAggregator:
def __init__(self):
self.values = []
self.count = 0
def process(self, data):
get_state_as_array(data)
self.values.append(data["state"])
self.count += 1
if self.count % 10 == 0:
# Calculate statistics every 10 messages
all_values = np.array(self.values)
mean = all_values.mean(axis=0)
std = all_values.std(axis=0)
print(f"\nStatistics after {self.count} messages:")
print(f"Mean: {mean}")
print(f"Std Dev: {std}")
aggregator = DataAggregator()
consumer = Consumer(config=config)
consumer.consume(aggregator.process)
Filtering and Routing
from avenieca.consumer import Consumer
from avenieca.config.broker import Broker
from avenieca.utils.signal import get_state_as_list
config = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="all_events",
group="router"
)
def route_message(data):
"""Route messages based on valence"""
valence = data.get("valence", 0)
get_state_as_list(data)
if valence > 50:
handle_high_priority(data)
elif valence < -50:
handle_alert(data)
else:
handle_normal(data)
def handle_high_priority(data):
print(f"HIGH PRIORITY: {data}")
def handle_alert(data):
print(f"ALERT: {data}")
def handle_normal(data):
print(f"Normal: {data}")
consumer = Consumer(config=config)
consumer.consume(route_message)
Error Handling
import logging
from avenieca.consumer import Consumer
from avenieca.config.broker import Broker
from avenieca.utils.signal import get_state_as_list
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
config = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="data_stream",
group="safe_consumer"
)
def safe_handler(data):
"""Handler with error handling"""
try:
# Validate data
if "state" not in data:
logger.error("Missing state field")
return
if "valence" not in data:
logger.warning("Missing valence field")
# Process data
get_state_as_list(data)
# Your processing logic
process_data(data)
logger.info(f"Successfully processed message")
except Exception as e:
logger.error(f"Error processing message: {e}")
def process_data(data):
# Your actual processing logic
pass
consumer = Consumer(config=config)
consumer.consume(safe_handler)
Custom Data Types
import numpy as np
from avenieca.consumer import Consumer
from avenieca.config.broker import Broker
from avenieca.utils.signal import get_state_as_array
config = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="typed_data",
group="typed_consumer"
)
def handler_with_dtype(data):
"""Convert state to specific numpy dtype"""
# Convert to int32
get_state_as_array(data, dtype=np.int32)
print(f"State as int32: {data['state']}")
print(f"Dtype: {data['state'].dtype}")
consumer = Consumer(config=config)
consumer.consume(handler_with_dtype)
Handler Function Requirements
Your handler function must:
- Accept a single parameter (the message data dictionary)
- Handle the message data appropriately
- Use utility functions to convert state if needed
Valid handler:
def valid_handler(data):
print(data["valence"])
get_state_as_list(data)
print(data["state"])
Invalid handler (no parameter):
def invalid_handler():
print("No data access") # Error!
Configuration Notes
Consumer Group
The group parameter in the Broker config identifies the consumer group:
- Multiple consumers with the same group share message processing
- Each message is delivered to only one consumer in the group
- Different groups receive all messages independently
Auto Offset Reset
The auto_offset_reset parameter controls where consumption starts:
"latest" (default): Start from newest messages
"earliest": Start from oldest available messages
config = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="topic",
group="my_group",
auto_offset_reset="earliest" # Start from beginning
)