Documentation Index
Fetch the complete documentation index at: https://mintlify.com/aveni-hub/avenieca-python/llms.txt
Use this file to discover all available pages before exploring further.
The Broker class defines the configuration for connecting to Kafka and managing topics for AveniECA streaming.
Class Definition
Location: avenieca/config/broker.py:7
@dataclass
class Broker(Base):
url: str
sub_topic: str
pub_topic: str
group: str
auto_offset_reset: str = "latest"
Configuration Fields
url (required)
Type: str
The Kafka broker URL with host and port.
Examples:
url="localhost:9092" # Local Kafka
url="kafka.example.com:9092" # Remote Kafka
url="broker1:9092,broker2:9092" # Multiple brokers
sub_topic (required)
Type: str
The topic name for subscribing (where producers send data to the digital twin).
Usage:
- Used by Stream and Event producers
- Not used by Consumer
- Can be empty string for consumer-only configs
Examples:
sub_topic="sensor_data" # Sensor readings
sub_topic="left_wheel" # Specific component
sub_topic="user_events" # User interactions
sub_topic="" # Consumer-only config
pub_topic (required)
Type: str
The topic name for publishing (where consumers read data from the digital twin).
Usage:
- Used by Consumer
- Not used by Stream and Event producers
- Can be empty string for producer-only configs
Examples:
pub_topic="output" # Generic output
pub_topic="predictions" # Model predictions
pub_topic="alerts" # Alert notifications
pub_topic="" # Producer-only config
group (required)
Type: str
The consumer group ID for Kafka consumer group management.
Purpose:
- Identifies which consumer group this client belongs to
- Multiple consumers with the same group share message processing
- Each message is delivered to only one consumer per group
- Different groups receive all messages independently
Examples:
group="test" # Test/development group
group="production" # Production consumer group
group="analytics" # Analytics processing group
group="worker-1" # Specific worker instance
auto_offset_reset (optional)
Type: str
Default: "latest"
Controls where consumption starts when no previous offset exists.
Values:
"latest": Start from newest messages (default)
"earliest": Start from oldest available messages
Examples:
auto_offset_reset="latest" # Only new messages
auto_offset_reset="earliest" # All available messages
Basic Usage
Producer Configuration
import os
from avenieca.config.broker import Broker
config = Broker(
url=os.environ["KAFKA_URL"],
sub_topic="sensor_data", # Where to send data
pub_topic="", # Not used for producers
group="sensors"
)
Consumer Configuration
import os
from avenieca.config.broker import Broker
config = Broker(
url=os.environ["KAFKA_URL"],
sub_topic="", # Not used for consumers
pub_topic="output", # Where to read data from
group="processors",
auto_offset_reset="latest"
)
Full Configuration
from avenieca.config.broker import Broker
config = Broker(
url="localhost:9092",
sub_topic="input_data",
pub_topic="output_data",
group="my_application",
auto_offset_reset="earliest"
)
Environment Variables
Common practice is to use environment variables for configuration:
import os
from avenieca.config.broker import Broker
config = Broker(
url=os.getenv("KAFKA_URL", "localhost:9092"),
sub_topic=os.getenv("KAFKA_SUB_TOPIC", "default_input"),
pub_topic=os.getenv("KAFKA_PUB_TOPIC", "default_output"),
group=os.getenv("KAFKA_GROUP", "default_group"),
auto_offset_reset=os.getenv("KAFKA_OFFSET_RESET", "latest")
)
.env file:
KAFKA_URL=kafka.example.com:9092
KAFKA_SUB_TOPIC=sensor_input
KAFKA_PUB_TOPIC=sensor_output
KAFKA_GROUP=sensor_processors
KAFKA_OFFSET_RESET=latest
Usage Examples
Stream Producer
from avenieca.config.broker import Broker
from avenieca.producers import Stream
from avenieca.data import Signal
config = Broker(
url="localhost:9092",
sub_topic="temperature_sensor",
pub_topic="", # Not needed for producer
group="sensors"
)
def handler():
return Signal(
valence=10,
state=[20.5, 21.0, 19.8]
)
stream = Stream(config=config, sync_rate=1.0)
stream.publish(handler)
Event Producer
from avenieca.config.broker import Broker
from avenieca.producers import Event
from avenieca.data import Signal
config = Broker(
url="localhost:9092",
sub_topic="user_clicks",
pub_topic="", # Not needed for producer
group="webapp"
)
signal = Signal(
valence=1.0,
state=[1.0, 0.0, 0.0]
)
event = Event(config=config)
event.publish(signal)
Consumer
from avenieca.config.broker import Broker
from avenieca.consumer import Consumer
from avenieca.utils.signal import get_state_as_list
config = Broker(
url="localhost:9092",
sub_topic="", # Not needed for consumer
pub_topic="predictions",
group="prediction_processors",
auto_offset_reset="earliest" # Start from beginning
)
def handler(data):
get_state_as_list(data)
print(f"Received prediction: {data}")
consumer = Consumer(config=config)
consumer.consume(handler)
Advanced Configurations
Multiple Brokers
from avenieca.config.broker import Broker
config = Broker(
url="broker1.example.com:9092,broker2.example.com:9092,broker3.example.com:9092",
sub_topic="high_availability",
pub_topic="",
group="ha_group"
)
Development vs Production
import os
from avenieca.config.broker import Broker
ENV = os.getenv("ENVIRONMENT", "development")
if ENV == "production":
config = Broker(
url=os.environ["PROD_KAFKA_URL"],
sub_topic="prod_input",
pub_topic="prod_output",
group="prod_workers",
auto_offset_reset="latest"
)
else:
config = Broker(
url="localhost:9092",
sub_topic="dev_input",
pub_topic="dev_output",
group="dev_workers",
auto_offset_reset="earliest" # See all messages in dev
)
Topic Naming Conventions
from avenieca.config.broker import Broker
# By environment
config = Broker(
url="localhost:9092",
sub_topic="prod.sensors.temperature",
pub_topic="prod.predictions.temperature",
group="prod.temperature.processors"
)
# By data type
config = Broker(
url="localhost:9092",
sub_topic="metrics.system.cpu",
pub_topic="alerts.system.cpu",
group="system.monitors"
)
# By component
config = Broker(
url="localhost:9092",
sub_topic="robot.left_wheel.telemetry",
pub_topic="robot.left_wheel.commands",
group="robot.controllers"
)
Consumer Group Behavior
Single Consumer
from avenieca.config.broker import Broker
from avenieca.consumer import Consumer
config = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="messages",
group="solo_consumer"
)
consumer = Consumer(config=config)
# Receives all messages
Multiple Consumers, Same Group
from avenieca.config.broker import Broker
from avenieca.consumer import Consumer
# Consumer 1
config1 = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="messages",
group="shared_group" # Same group
)
consumer1 = Consumer(config=config1)
# Consumer 2
config2 = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="messages",
group="shared_group" # Same group
)
consumer2 = Consumer(config=config2)
# Messages are split between consumer1 and consumer2
Multiple Consumers, Different Groups
from avenieca.config.broker import Broker
from avenieca.consumer import Consumer
# Consumer 1
config1 = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="messages",
group="group_a" # Different group
)
consumer1 = Consumer(config=config1)
# Consumer 2
config2 = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="messages",
group="group_b" # Different group
)
consumer2 = Consumer(config=config2)
# Both consumers receive ALL messages
Offset Reset Behavior
Latest (Default)
config = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="stream",
group="new_consumer",
auto_offset_reset="latest"
)
# First run: Receives only new messages sent after consumer starts
# Subsequent runs: Continues from last processed message
Earliest
config = Broker(
url="localhost:9092",
sub_topic="",
pub_topic="stream",
group="new_consumer",
auto_offset_reset="earliest"
)
# First run: Receives all available messages from the beginning
# Subsequent runs: Continues from last processed message
Configuration Best Practices
- Use environment variables for sensitive or environment-specific values
- Use descriptive topic names that indicate data type and purpose
- Use consistent group naming across related consumers
- Set
auto_offset_reset="earliest" for development to see all messages
- Set
auto_offset_reset="latest" for production to avoid reprocessing
- Use empty strings for unused topic fields (sub_topic for consumers, pub_topic for producers)
Connection Examples
Local Development
config = Broker(
url="localhost:9092",
sub_topic="dev_input",
pub_topic="dev_output",
group="dev"
)
Docker Compose
config = Broker(
url="kafka:9092", # Service name in docker-compose.yml
sub_topic="app_input",
pub_topic="app_output",
group="app"
)
Cloud Kafka (Confluent, AWS MSK, etc.)
import os
config = Broker(
url=os.environ["KAFKA_BOOTSTRAP_SERVERS"],
sub_topic="prod_sensor_data",
pub_topic="prod_predictions",
group="prod_processors"
)