Skip to main content

Overview

The Consumer class is used for consuming messages (signals) from a digital twin. It connects to a Kafka topic and processes incoming messages through a handler function.

Constructor

Consumer(config: Broker)
Creates a new Consumer instance.
config
Broker
required
Configuration object containing broker connection details. Must include:
  • url: Broker URL
  • sub_topic: Subscription topic
  • pub_topic: Publishing topic (used as the consumption topic)
  • group: Consumer group
  • auto_offset_reset: Offset reset policy (default: “latest”)

Methods

consume()

consume(func, sync_once=False)
Consumes messages from the configured topic and processes them with the provided handler function.
func
callable
required
Handler function that processes received messages. The function receives a single parameter:
  • data (dict): Deserialized JSON message data containing the signal information
Handler Signature:
def handler(data: dict) -> None:
    # Process the message data
    pass
sync_once
bool
default:"False"
If True, processes only one message and then stops. If False, continues consuming messages indefinitely.
Returns: None

Properties

config

The broker configuration object.
config = consumer.config

topic

The Kafka topic being consumed from (set from config.pub_topic).
topic = consumer.topic

client

The underlying KafkaConsumer client instance.
kafka_client = consumer.client

Handler Function Signature

Your handler function should accept a single parameter containing the deserialized message data:
def message_handler(data: dict):
    """
    Process incoming signal data.
    
    Args:
        data: Dictionary containing signal fields:
            - state: List of float values
            - valence: Optional float
            - score: Optional int
            - emb_inp: Optional int
    """
    state = data['state']
    valence = data.get('valence')
    score = data.get('score')
    
    # Your processing logic here
    print(f"Received signal with state: {state}")

Example

from avenieca import Consumer, Broker

# Configure broker connection
config = Broker(
    url="localhost:9092",
    sub_topic="sensor-input",
    pub_topic="sensor-output",  # This is the topic consumed from
    group="analytics-group"
)

# Create consumer
consumer = Consumer(config=config)

# Define message handler
def process_signal(data):
    """
    Process incoming sensor signals.
    """
    print(f"Received signal: {data}")
    
    state = data['state']
    valence = data.get('valence', 0.0)
    score = data.get('score', 0)
    
    # Analyze the signal
    avg_value = sum(state) / len(state)
    print(f"Average state value: {avg_value}")
    print(f"Valence: {valence}, Score: {score}")
    
    # Store to database, trigger alerts, etc.
    if avg_value > 100:
        print("Alert: High reading detected!")

# Start consuming (runs indefinitely)
consumer.consume(process_signal)

# Or consume just one message
consumer.consume(process_signal, sync_once=True)

Advanced Example

import logging
from avenieca import Consumer, Broker

logger = logging.getLogger(__name__)

# Configure consumer
config = Broker(
    url="kafka.example.com:9092",
    sub_topic="device-events",
    pub_topic="processed-events",
    group="monitoring-service",
    auto_offset_reset="earliest"  # Start from beginning
)

consumer = Consumer(config=config)

class SignalProcessor:
    def __init__(self):
        self.count = 0
        self.total_valence = 0.0
    
    def handle_message(self, data):
        """
        Stateful message processing with analytics.
        """
        self.count += 1
        
        state = data['state']
        valence = data.get('valence', 0.0)
        score = data.get('score', 0)
        
        self.total_valence += valence
        avg_valence = self.total_valence / self.count
        
        logger.info(f"Processed {self.count} messages")
        logger.info(f"Current signal - State: {state}, Score: {score}")
        logger.info(f"Running average valence: {avg_valence:.3f}")
        
        # Conditional logic
        if score > 90:
            self.handle_high_quality_signal(data)
        
        # Persist data
        self.save_to_database(data)
    
    def handle_high_quality_signal(self, data):
        logger.warning(f"High quality signal detected: {data}")
        # Trigger notification, update dashboard, etc.
    
    def save_to_database(self, data):
        # Database persistence logic
        pass

# Use class method as handler
processor = SignalProcessor()
consumer.consume(processor.handle_message)

Build docs developers (and LLMs) love