Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/aveni-hub/avenieca-python/llms.txt

Use this file to discover all available pages before exploring further.

The Broker class defines the configuration for connecting to Kafka and managing topics for AveniECA streaming.

Class Definition

Location: avenieca/config/broker.py:7
@dataclass
class Broker(Base):
    url: str
    sub_topic: str
    pub_topic: str
    group: str
    auto_offset_reset: str = "latest"

Configuration Fields

url (required)

Type: str The Kafka broker URL with host and port. Examples:
url="localhost:9092"                    # Local Kafka
url="kafka.example.com:9092"            # Remote Kafka
url="broker1:9092,broker2:9092"         # Multiple brokers

sub_topic (required)

Type: str The topic name for subscribing (where producers send data to the digital twin). Usage:
  • Used by Stream and Event producers
  • Not used by Consumer
  • Can be empty string for consumer-only configs
Examples:
sub_topic="sensor_data"      # Sensor readings
sub_topic="left_wheel"       # Specific component
sub_topic="user_events"      # User interactions
sub_topic=""                 # Consumer-only config

pub_topic (required)

Type: str The topic name for publishing (where consumers read data from the digital twin). Usage:
  • Used by Consumer
  • Not used by Stream and Event producers
  • Can be empty string for producer-only configs
Examples:
pub_topic="output"           # Generic output
pub_topic="predictions"      # Model predictions
pub_topic="alerts"           # Alert notifications
pub_topic=""                 # Producer-only config

group (required)

Type: str The consumer group ID for Kafka consumer group management. Purpose:
  • Identifies which consumer group this client belongs to
  • Multiple consumers with the same group share message processing
  • Each message is delivered to only one consumer per group
  • Different groups receive all messages independently
Examples:
group="test"                 # Test/development group
group="production"           # Production consumer group
group="analytics"            # Analytics processing group
group="worker-1"             # Specific worker instance

auto_offset_reset (optional)

Type: str
Default: "latest"
Controls where consumption starts when no previous offset exists. Values:
  • "latest": Start from newest messages (default)
  • "earliest": Start from oldest available messages
Examples:
auto_offset_reset="latest"    # Only new messages
auto_offset_reset="earliest"  # All available messages

Basic Usage

Producer Configuration

import os
from avenieca.config.broker import Broker

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="sensor_data",  # Where to send data
    pub_topic="",             # Not used for producers
    group="sensors"
)

Consumer Configuration

import os
from avenieca.config.broker import Broker

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="",             # Not used for consumers
    pub_topic="output",       # Where to read data from
    group="processors",
    auto_offset_reset="latest"
)

Full Configuration

from avenieca.config.broker import Broker

config = Broker(
    url="localhost:9092",
    sub_topic="input_data",
    pub_topic="output_data",
    group="my_application",
    auto_offset_reset="earliest"
)

Environment Variables

Common practice is to use environment variables for configuration:
import os
from avenieca.config.broker import Broker

config = Broker(
    url=os.getenv("KAFKA_URL", "localhost:9092"),
    sub_topic=os.getenv("KAFKA_SUB_TOPIC", "default_input"),
    pub_topic=os.getenv("KAFKA_PUB_TOPIC", "default_output"),
    group=os.getenv("KAFKA_GROUP", "default_group"),
    auto_offset_reset=os.getenv("KAFKA_OFFSET_RESET", "latest")
)
.env file:
KAFKA_URL=kafka.example.com:9092
KAFKA_SUB_TOPIC=sensor_input
KAFKA_PUB_TOPIC=sensor_output
KAFKA_GROUP=sensor_processors
KAFKA_OFFSET_RESET=latest

Usage Examples

Stream Producer

from avenieca.config.broker import Broker
from avenieca.producers import Stream
from avenieca.data import Signal

config = Broker(
    url="localhost:9092",
    sub_topic="temperature_sensor",
    pub_topic="",  # Not needed for producer
    group="sensors"
)

def handler():
    return Signal(
        valence=10,
        state=[20.5, 21.0, 19.8]
    )

stream = Stream(config=config, sync_rate=1.0)
stream.publish(handler)

Event Producer

from avenieca.config.broker import Broker
from avenieca.producers import Event
from avenieca.data import Signal

config = Broker(
    url="localhost:9092",
    sub_topic="user_clicks",
    pub_topic="",  # Not needed for producer
    group="webapp"
)

signal = Signal(
    valence=1.0,
    state=[1.0, 0.0, 0.0]
)

event = Event(config=config)
event.publish(signal)

Consumer

from avenieca.config.broker import Broker
from avenieca.consumer import Consumer
from avenieca.utils.signal import get_state_as_list

config = Broker(
    url="localhost:9092",
    sub_topic="",  # Not needed for consumer
    pub_topic="predictions",
    group="prediction_processors",
    auto_offset_reset="earliest"  # Start from beginning
)

def handler(data):
    get_state_as_list(data)
    print(f"Received prediction: {data}")

consumer = Consumer(config=config)
consumer.consume(handler)

Advanced Configurations

Multiple Brokers

from avenieca.config.broker import Broker

config = Broker(
    url="broker1.example.com:9092,broker2.example.com:9092,broker3.example.com:9092",
    sub_topic="high_availability",
    pub_topic="",
    group="ha_group"
)

Development vs Production

import os
from avenieca.config.broker import Broker

ENV = os.getenv("ENVIRONMENT", "development")

if ENV == "production":
    config = Broker(
        url=os.environ["PROD_KAFKA_URL"],
        sub_topic="prod_input",
        pub_topic="prod_output",
        group="prod_workers",
        auto_offset_reset="latest"
    )
else:
    config = Broker(
        url="localhost:9092",
        sub_topic="dev_input",
        pub_topic="dev_output",
        group="dev_workers",
        auto_offset_reset="earliest"  # See all messages in dev
    )

Topic Naming Conventions

from avenieca.config.broker import Broker

# By environment
config = Broker(
    url="localhost:9092",
    sub_topic="prod.sensors.temperature",
    pub_topic="prod.predictions.temperature",
    group="prod.temperature.processors"
)

# By data type
config = Broker(
    url="localhost:9092",
    sub_topic="metrics.system.cpu",
    pub_topic="alerts.system.cpu",
    group="system.monitors"
)

# By component
config = Broker(
    url="localhost:9092",
    sub_topic="robot.left_wheel.telemetry",
    pub_topic="robot.left_wheel.commands",
    group="robot.controllers"
)

Consumer Group Behavior

Single Consumer

from avenieca.config.broker import Broker
from avenieca.consumer import Consumer

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="messages",
    group="solo_consumer"
)

consumer = Consumer(config=config)
# Receives all messages

Multiple Consumers, Same Group

from avenieca.config.broker import Broker
from avenieca.consumer import Consumer

# Consumer 1
config1 = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="messages",
    group="shared_group"  # Same group
)
consumer1 = Consumer(config=config1)

# Consumer 2
config2 = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="messages",
    group="shared_group"  # Same group
)
consumer2 = Consumer(config=config2)

# Messages are split between consumer1 and consumer2

Multiple Consumers, Different Groups

from avenieca.config.broker import Broker
from avenieca.consumer import Consumer

# Consumer 1
config1 = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="messages",
    group="group_a"  # Different group
)
consumer1 = Consumer(config=config1)

# Consumer 2
config2 = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="messages",
    group="group_b"  # Different group
)
consumer2 = Consumer(config=config2)

# Both consumers receive ALL messages

Offset Reset Behavior

Latest (Default)

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="stream",
    group="new_consumer",
    auto_offset_reset="latest"
)

# First run: Receives only new messages sent after consumer starts
# Subsequent runs: Continues from last processed message

Earliest

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="stream",
    group="new_consumer",
    auto_offset_reset="earliest"
)

# First run: Receives all available messages from the beginning
# Subsequent runs: Continues from last processed message

Configuration Best Practices

  1. Use environment variables for sensitive or environment-specific values
  2. Use descriptive topic names that indicate data type and purpose
  3. Use consistent group naming across related consumers
  4. Set auto_offset_reset="earliest" for development to see all messages
  5. Set auto_offset_reset="latest" for production to avoid reprocessing
  6. Use empty strings for unused topic fields (sub_topic for consumers, pub_topic for producers)

Connection Examples

Local Development

config = Broker(
    url="localhost:9092",
    sub_topic="dev_input",
    pub_topic="dev_output",
    group="dev"
)

Docker Compose

config = Broker(
    url="kafka:9092",  # Service name in docker-compose.yml
    sub_topic="app_input",
    pub_topic="app_output",
    group="app"
)

Cloud Kafka (Confluent, AWS MSK, etc.)

import os

config = Broker(
    url=os.environ["KAFKA_BOOTSTRAP_SERVERS"],
    sub_topic="prod_sensor_data",
    pub_topic="prod_predictions",
    group="prod_processors"
)

Build docs developers (and LLMs) love