Overview
The Consumer class is used for consuming messages (signals) from a digital twin. It connects to a Kafka topic and processes incoming messages through a handler function.
Constructor
Creates a new Consumer instance.
Configuration object containing broker connection details. Must include:
url: Broker URL
sub_topic: Subscription topic
pub_topic: Publishing topic (used as the consumption topic)
group: Consumer group
auto_offset_reset: Offset reset policy (default: “latest”)
Methods
consume()
consume(func, sync_once=False)
Consumes messages from the configured topic and processes them with the provided handler function.
Handler function that processes received messages. The function receives a single parameter:
data (dict): Deserialized JSON message data containing the signal information
Handler Signature:def handler(data: dict) -> None:
# Process the message data
pass
If True, processes only one message and then stops. If False, continues consuming messages indefinitely.
Returns: None
Properties
config
The broker configuration object.
topic
The Kafka topic being consumed from (set from config.pub_topic).
client
The underlying KafkaConsumer client instance.
kafka_client = consumer.client
Handler Function Signature
Your handler function should accept a single parameter containing the deserialized message data:
def message_handler(data: dict):
"""
Process incoming signal data.
Args:
data: Dictionary containing signal fields:
- state: List of float values
- valence: Optional float
- score: Optional int
- emb_inp: Optional int
"""
state = data['state']
valence = data.get('valence')
score = data.get('score')
# Your processing logic here
print(f"Received signal with state: {state}")
Example
from avenieca import Consumer, Broker
# Configure broker connection
config = Broker(
url="localhost:9092",
sub_topic="sensor-input",
pub_topic="sensor-output", # This is the topic consumed from
group="analytics-group"
)
# Create consumer
consumer = Consumer(config=config)
# Define message handler
def process_signal(data):
"""
Process incoming sensor signals.
"""
print(f"Received signal: {data}")
state = data['state']
valence = data.get('valence', 0.0)
score = data.get('score', 0)
# Analyze the signal
avg_value = sum(state) / len(state)
print(f"Average state value: {avg_value}")
print(f"Valence: {valence}, Score: {score}")
# Store to database, trigger alerts, etc.
if avg_value > 100:
print("Alert: High reading detected!")
# Start consuming (runs indefinitely)
consumer.consume(process_signal)
# Or consume just one message
consumer.consume(process_signal, sync_once=True)
Advanced Example
import logging
from avenieca import Consumer, Broker
logger = logging.getLogger(__name__)
# Configure consumer
config = Broker(
url="kafka.example.com:9092",
sub_topic="device-events",
pub_topic="processed-events",
group="monitoring-service",
auto_offset_reset="earliest" # Start from beginning
)
consumer = Consumer(config=config)
class SignalProcessor:
def __init__(self):
self.count = 0
self.total_valence = 0.0
def handle_message(self, data):
"""
Stateful message processing with analytics.
"""
self.count += 1
state = data['state']
valence = data.get('valence', 0.0)
score = data.get('score', 0)
self.total_valence += valence
avg_valence = self.total_valence / self.count
logger.info(f"Processed {self.count} messages")
logger.info(f"Current signal - State: {state}, Score: {score}")
logger.info(f"Running average valence: {avg_valence:.3f}")
# Conditional logic
if score > 90:
self.handle_high_quality_signal(data)
# Persist data
self.save_to_database(data)
def handle_high_quality_signal(self, data):
logger.warning(f"High quality signal detected: {data}")
# Trigger notification, update dashboard, etc.
def save_to_database(self, data):
# Database persistence logic
pass
# Use class method as handler
processor = SignalProcessor()
consumer.consume(processor.handle_message)