Skip to main content
The Broker class defines the configuration for connecting to Kafka and managing topics for AveniECA streaming.

Class Definition

Location: avenieca/config/broker.py:7
@dataclass
class Broker(Base):
    url: str
    sub_topic: str
    pub_topic: str
    group: str
    auto_offset_reset: str = "latest"

Configuration Fields

url (required)

Type: str The Kafka broker URL with host and port. Examples:
url="localhost:9092"                    # Local Kafka
url="kafka.example.com:9092"            # Remote Kafka
url="broker1:9092,broker2:9092"         # Multiple brokers

sub_topic (required)

Type: str The topic name for subscribing (where producers send data to the digital twin). Usage:
  • Used by Stream and Event producers
  • Not used by Consumer
  • Can be empty string for consumer-only configs
Examples:
sub_topic="sensor_data"      # Sensor readings
sub_topic="left_wheel"       # Specific component
sub_topic="user_events"      # User interactions
sub_topic=""                 # Consumer-only config

pub_topic (required)

Type: str The topic name for publishing (where consumers read data from the digital twin). Usage:
  • Used by Consumer
  • Not used by Stream and Event producers
  • Can be empty string for producer-only configs
Examples:
pub_topic="output"           # Generic output
pub_topic="predictions"      # Model predictions
pub_topic="alerts"           # Alert notifications
pub_topic=""                 # Producer-only config

group (required)

Type: str The consumer group ID for Kafka consumer group management. Purpose:
  • Identifies which consumer group this client belongs to
  • Multiple consumers with the same group share message processing
  • Each message is delivered to only one consumer per group
  • Different groups receive all messages independently
Examples:
group="test"                 # Test/development group
group="production"           # Production consumer group
group="analytics"            # Analytics processing group
group="worker-1"             # Specific worker instance

auto_offset_reset (optional)

Type: str
Default: "latest"
Controls where consumption starts when no previous offset exists. Values:
  • "latest": Start from newest messages (default)
  • "earliest": Start from oldest available messages
Examples:
auto_offset_reset="latest"    # Only new messages
auto_offset_reset="earliest"  # All available messages

Basic Usage

Producer Configuration

import os
from avenieca.config.broker import Broker

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="sensor_data",  # Where to send data
    pub_topic="",             # Not used for producers
    group="sensors"
)

Consumer Configuration

import os
from avenieca.config.broker import Broker

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="",             # Not used for consumers
    pub_topic="output",       # Where to read data from
    group="processors",
    auto_offset_reset="latest"
)

Full Configuration

from avenieca.config.broker import Broker

config = Broker(
    url="localhost:9092",
    sub_topic="input_data",
    pub_topic="output_data",
    group="my_application",
    auto_offset_reset="earliest"
)

Environment Variables

Common practice is to use environment variables for configuration:
import os
from avenieca.config.broker import Broker

config = Broker(
    url=os.getenv("KAFKA_URL", "localhost:9092"),
    sub_topic=os.getenv("KAFKA_SUB_TOPIC", "default_input"),
    pub_topic=os.getenv("KAFKA_PUB_TOPIC", "default_output"),
    group=os.getenv("KAFKA_GROUP", "default_group"),
    auto_offset_reset=os.getenv("KAFKA_OFFSET_RESET", "latest")
)
.env file:
KAFKA_URL=kafka.example.com:9092
KAFKA_SUB_TOPIC=sensor_input
KAFKA_PUB_TOPIC=sensor_output
KAFKA_GROUP=sensor_processors
KAFKA_OFFSET_RESET=latest

Usage Examples

Stream Producer

from avenieca.config.broker import Broker
from avenieca.producers import Stream
from avenieca.data import Signal

config = Broker(
    url="localhost:9092",
    sub_topic="temperature_sensor",
    pub_topic="",  # Not needed for producer
    group="sensors"
)

def handler():
    return Signal(
        valence=10,
        state=[20.5, 21.0, 19.8]
    )

stream = Stream(config=config, sync_rate=1.0)
stream.publish(handler)

Event Producer

from avenieca.config.broker import Broker
from avenieca.producers import Event
from avenieca.data import Signal

config = Broker(
    url="localhost:9092",
    sub_topic="user_clicks",
    pub_topic="",  # Not needed for producer
    group="webapp"
)

signal = Signal(
    valence=1.0,
    state=[1.0, 0.0, 0.0]
)

event = Event(config=config)
event.publish(signal)

Consumer

from avenieca.config.broker import Broker
from avenieca.consumer import Consumer
from avenieca.utils.signal import get_state_as_list

config = Broker(
    url="localhost:9092",
    sub_topic="",  # Not needed for consumer
    pub_topic="predictions",
    group="prediction_processors",
    auto_offset_reset="earliest"  # Start from beginning
)

def handler(data):
    get_state_as_list(data)
    print(f"Received prediction: {data}")

consumer = Consumer(config=config)
consumer.consume(handler)

Advanced Configurations

Multiple Brokers

from avenieca.config.broker import Broker

config = Broker(
    url="broker1.example.com:9092,broker2.example.com:9092,broker3.example.com:9092",
    sub_topic="high_availability",
    pub_topic="",
    group="ha_group"
)

Development vs Production

import os
from avenieca.config.broker import Broker

ENV = os.getenv("ENVIRONMENT", "development")

if ENV == "production":
    config = Broker(
        url=os.environ["PROD_KAFKA_URL"],
        sub_topic="prod_input",
        pub_topic="prod_output",
        group="prod_workers",
        auto_offset_reset="latest"
    )
else:
    config = Broker(
        url="localhost:9092",
        sub_topic="dev_input",
        pub_topic="dev_output",
        group="dev_workers",
        auto_offset_reset="earliest"  # See all messages in dev
    )

Topic Naming Conventions

from avenieca.config.broker import Broker

# By environment
config = Broker(
    url="localhost:9092",
    sub_topic="prod.sensors.temperature",
    pub_topic="prod.predictions.temperature",
    group="prod.temperature.processors"
)

# By data type
config = Broker(
    url="localhost:9092",
    sub_topic="metrics.system.cpu",
    pub_topic="alerts.system.cpu",
    group="system.monitors"
)

# By component
config = Broker(
    url="localhost:9092",
    sub_topic="robot.left_wheel.telemetry",
    pub_topic="robot.left_wheel.commands",
    group="robot.controllers"
)

Consumer Group Behavior

Single Consumer

from avenieca.config.broker import Broker
from avenieca.consumer import Consumer

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="messages",
    group="solo_consumer"
)

consumer = Consumer(config=config)
# Receives all messages

Multiple Consumers, Same Group

from avenieca.config.broker import Broker
from avenieca.consumer import Consumer

# Consumer 1
config1 = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="messages",
    group="shared_group"  # Same group
)
consumer1 = Consumer(config=config1)

# Consumer 2
config2 = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="messages",
    group="shared_group"  # Same group
)
consumer2 = Consumer(config=config2)

# Messages are split between consumer1 and consumer2

Multiple Consumers, Different Groups

from avenieca.config.broker import Broker
from avenieca.consumer import Consumer

# Consumer 1
config1 = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="messages",
    group="group_a"  # Different group
)
consumer1 = Consumer(config=config1)

# Consumer 2
config2 = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="messages",
    group="group_b"  # Different group
)
consumer2 = Consumer(config=config2)

# Both consumers receive ALL messages

Offset Reset Behavior

Latest (Default)

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="stream",
    group="new_consumer",
    auto_offset_reset="latest"
)

# First run: Receives only new messages sent after consumer starts
# Subsequent runs: Continues from last processed message

Earliest

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="stream",
    group="new_consumer",
    auto_offset_reset="earliest"
)

# First run: Receives all available messages from the beginning
# Subsequent runs: Continues from last processed message

Configuration Best Practices

  1. Use environment variables for sensitive or environment-specific values
  2. Use descriptive topic names that indicate data type and purpose
  3. Use consistent group naming across related consumers
  4. Set auto_offset_reset="earliest" for development to see all messages
  5. Set auto_offset_reset="latest" for production to avoid reprocessing
  6. Use empty strings for unused topic fields (sub_topic for consumers, pub_topic for producers)

Connection Examples

Local Development

config = Broker(
    url="localhost:9092",
    sub_topic="dev_input",
    pub_topic="dev_output",
    group="dev"
)

Docker Compose

config = Broker(
    url="kafka:9092",  # Service name in docker-compose.yml
    sub_topic="app_input",
    pub_topic="app_output",
    group="app"
)

Cloud Kafka (Confluent, AWS MSK, etc.)

import os

config = Broker(
    url=os.environ["KAFKA_BOOTSTRAP_SERVERS"],
    sub_topic="prod_sensor_data",
    pub_topic="prod_predictions",
    group="prod_processors"
)

Build docs developers (and LLMs) love