The handler should return a Signal dataclass with your state data:
def handler(): # Your state vector (e.g., sensor readings, coordinates, etc.) signal = Signal( state=[0.2, 0.3, 0.8], # State vector valence=10, # Evaluative value (-100 to +100) ) return signal
The state field must be a list of floats. The valence field is optional but recommended for learning evaluative patterns.
3
Configure the broker connection
config = Broker( url=os.environ["KAFKA_URL"], sub_topic="left_wheel", # Your twin's subscriber topic group="test", # Consumer group name pub_topic="" # Leave empty for producers)
4
Start streaming
# Initialize stream with 1-second sync ratestream = Stream(config=config, sync_rate=1)# Start publishing (this will run continuously)stream.publish(handler)
stream.publish() runs in an infinite loop. Use Ctrl+C to stop, or run in a separate thread for production applications.
import osfrom avenieca.config.broker import Brokerfrom avenieca.data import Signalfrom avenieca.producers import Streamdef handler(): """Generate state signal from your data source""" signal = Signal( state=[0.2, 0.3, 0.8], valence=10, ) return signal# Configure brokerconfig = Broker( url=os.environ["KAFKA_URL"], sub_topic="left_wheel", group="test", pub_topic="")# Stream at 1 Hzstream = Stream(config=config, sync_rate=1)stream.publish(handler)
Expected Output:The stream will continuously publish signals to Kafka. You won’t see output unless you add logging, but your AveniECA instance will receive the state updates.
Receive and process messages from your twin’s output topic:
consumer_example.py
import osfrom avenieca.config.broker import Brokerfrom avenieca.consumer import Consumerfrom avenieca.utils.signal import get_state_as_listconfig = Broker( url=os.environ["KAFKA_URL"], sub_topic="left_wheel", group="test", pub_topic="")# Define handler to process incoming messagesdef handler(data): valence = data["valence"] state = data["state"] # Raw state as string # Convert state to Python list get_state_as_list(data) state_list = data["state"] # Now a list of floats print(f"Received - Valence: {valence}, State: {state_list}")client = Consumer(config=config)client.consume(handler, True) # True = auto-commit offsets
Use the Cortex API to predict future states based on learned sequences:
predictions.py
from avenieca.api.model import NextStateRequest# Request next-state predictionsnsr = NextStateRequest( module_id="aggregate001", recall=20, # Number of past states to consider range=20, # Search range for similar patterns n=1, # Number of predictions to return status="e", # Sequence status filter)res, status = eca.cortex.predictions(data=nsr)print(f"Current state: {res.current_state}")print(f"Predicted next states: {res.next_state}")
Expected Output:
Current state: [Twin(aggregate_id=5, ess_id=42, module_id='temperature', state='[25.0]')]Predicted next states: [[Twin(...), Twin(...)]]
Store documents and query them with natural language:
from avenieca.api.model import DocumentInsert# Create and embed a documentdocument = DocumentInsert( doc_id="001", text="The temperature was 28°C on May 3rd at 1pm", embed=True, # Generate embeddings for retrieval)res, status = eca.document.create(data=document)print(f"Document created: {res.id}")
Expected Output:
Document created: 1Answer: The temperature was 28°C on May 3rd at 1pm
from avenieca.api.model import SequenceInsert# Create a sequencesequence = SequenceInsert( module_id="air_conditioner", instance_id=10, status="e", # 'e' = ended, 'n' = new, 'sk' = skipped context=None,)res, status = eca.sequence.create(data=sequence)print(f"Sequence created: {res.id}")# Get all sequences for a moduleres, status = eca.sequence.get_all(module_id="air_conditioner")for seq in res: print(f"Sequence {seq.id}: instance={seq.instance_id}, status={seq.status}")