Consumer class receives and processes messages (signals) from your digital twin via Kafka topics. It provides handler-based message processing and utilities for state data conversion.
Class Definition
Location:avenieca/consumer.py:7
config(Broker): Broker configuration object
Methods
consume
Signature:avenieca/consumer.py:24
func: Handler function to process received messagessync_once(optional): If True, processes only one message and exits (default: False)
Basic Usage
Simple Consumer
Consume Once
Process a single message and exit:Message Format
Messages received by the handler are dictionaries with the following structure:state field is received as a JSON string and needs to be converted to use as a list or numpy array.
Signal Utilities
AveniECA provides utility functions to convert the state signal from its string representation.get_state_as_list
Location:avenieca/utils/signal.py:16
get_state_as_array
Location:avenieca/utils/signal.py:8
Complete Example from README
Advanced Examples
Processing with Multiple Fields
Data Aggregation
Filtering and Routing
Error Handling
Custom Data Types
Handler Function Requirements
Your handler function must:- Accept a single parameter (the message data dictionary)
- Handle the message data appropriately
- Use utility functions to convert state if needed
Configuration Notes
Consumer Group
Thegroup parameter in the Broker config identifies the consumer group:
- Multiple consumers with the same group share message processing
- Each message is delivered to only one consumer in the group
- Different groups receive all messages independently
Auto Offset Reset
Theauto_offset_reset parameter controls where consumption starts:
"latest"(default): Start from newest messages"earliest": Start from oldest available messages
Related Classes
- Stream Producer - For producing continuous streams
- Event Producer - For producing individual events
- Configuration - Broker configuration options