Documentation Index
Fetch the complete documentation index at: https://mintlify.com/mofa-org/mofa/llms.txt
Use this file to discover all available pages before exploring further.
Distributed Runtime with Dora-rs
MoFA integrates with dora-rs, a high-performance dataflow runtime for distributed agent systems. This enables scaling agents across processes and machines while maintaining low-latency communication.
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ MoFA + Dora-rs Integration │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ MoFA Layer │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ DoraAgent │ │DoraOperator │ │ DoraDataflow │ │ │
│ │ │ Node │ │ (Plugin) │ │ Builder │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Dora-rs Runtime │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Dataflow │ │ Nodes │ │ Operators │ │ │
│ │ │ Executor │ │ (Processes)│ │ (Zero-copy) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Enable Dora Support
Add the dora feature flag:
[dependencies]
mofa-runtime = { version = "*", features = ["dora"] }
Runtime Modes
Embedded Mode
Run dataflows in the current process:
use mofa_runtime::dora_adapter::{
DoraRuntime, RuntimeMode, EmbeddedConfig
};
let config = EmbeddedConfig {
working_directory: "./dataflows".into(),
log_level: "info".to_string(),
};
let runtime = DoraRuntime::new(RuntimeMode::Embedded(config))?;
Distributed Mode
Run dataflows across multiple machines:
use mofa_runtime::dora_adapter::{
DoraRuntime, RuntimeMode, DistributedConfig
};
let config = DistributedConfig {
coordinator_addr: "http://coordinator:4000".to_string(),
machine_id: "worker-01".to_string(),
listen_addr: "0.0.0.0:5000".to_string(),
working_directory: "./dataflows".into(),
log_level: "info".to_string(),
};
let runtime = DoraRuntime::new(RuntimeMode::Distributed(config))?;
Dataflow Builder
Define dataflow graphs declaratively:
use mofa_runtime::dora_adapter::{
DataflowBuilder, NodeConnection, ChannelConfig
};
let dataflow = DataflowBuilder::new("agent_pipeline")
.with_description("Multi-agent processing pipeline")
// Add agent nodes
.add_agent_node("input_agent", input_agent, vec!["raw_data".to_string()])
.add_agent_node("processor", processor_agent, vec!["processed".to_string()])
.add_agent_node("output_agent", output_agent, vec![])
// Connect nodes
.add_connection(NodeConnection {
from_node: "input_agent".to_string(),
from_output: "raw_data".to_string(),
to_node: "processor".to_string(),
to_input: "input".to_string(),
channel_config: ChannelConfig::default(),
})
.add_connection(NodeConnection {
from_node: "processor".to_string(),
from_output: "processed".to_string(),
to_node: "output_agent".to_string(),
to_input: "final".to_string(),
channel_config: ChannelConfig::default(),
})
.build()?;
Agent Nodes
Wrap MoFA agents as Dora nodes:
use mofa_runtime::dora_adapter::{
DoraAgentNode, DoraNodeConfig, NodeEventLoop
};
use mofa_sdk::agent::Agent;
// Create your MoFA agent
let agent = create_my_agent()?;
// Wrap as Dora node
let node_config = DoraNodeConfig {
id: "my_agent".to_string(),
inputs: vec!["input_data".to_string()],
outputs: vec!["result".to_string()],
..Default::default()
};
let dora_node = DoraAgentNode::new(agent, node_config);
// Run event loop
let event_loop = NodeEventLoop::new(dora_node);
event_loop.run().await?;
Custom Node Logic
impl DoraAgentNode {
async fn on_input(&mut self, input_id: &str, data: Vec<u8>) -> Result<()> {
match input_id {
"task_queue" => {
let task: Task = bincode::deserialize(&data)?;
let result = self.agent.process_task(task).await?;
self.send_output("results", &result).await?;
}
"control" => {
let cmd: Command = bincode::deserialize(&data)?;
self.handle_command(cmd).await?;
}
_ => warn!("Unknown input: {}", input_id),
}
Ok(())
}
}
Operators
Use operators for lightweight processing:
use mofa_runtime::dora_adapter::{
MoFAOperator, OperatorInput, OperatorOutput
};
struct FilterOperator {
threshold: f64,
}
impl MoFAOperator for FilterOperator {
fn on_input(
&mut self,
input_id: &str,
data: Vec<u8>,
) -> Result<Vec<OperatorOutput>> {
let value: f64 = bincode::deserialize(&data)?;
if value > self.threshold {
Ok(vec![OperatorOutput {
id: "passed".to_string(),
data: data,
}])
} else {
Ok(vec![OperatorOutput {
id: "filtered".to_string(),
data: vec![],
}])
}
}
}
Plugin Integration
Wrap MoFA plugins as Dora operators:
use mofa_runtime::dora_adapter::{
DoraPluginOperator, PluginOperatorAdapter
};
let plugin = create_my_plugin()?;
let operator = PluginOperatorAdapter::new(
plugin,
vec!["input"],
vec!["output"],
);
let dora_operator = DoraPluginOperator::from_adapter(operator);
Channel Configuration
Control communication between nodes:
use mofa_runtime::dora_adapter::ChannelConfig;
let config = ChannelConfig {
buffer_size: 1000, // Message buffer
shared_memory: true, // Use zero-copy shared memory
qos: QoS::BestEffort, // Quality of service
priority: 5, // Channel priority
};
let connection = NodeConnection {
from_node: "producer".to_string(),
from_output: "data".to_string(),
to_node: "consumer".to_string(),
to_input: "stream".to_string(),
channel_config: config,
};
Running Dataflows
Simple Execution
use mofa_runtime::dora_adapter::run_dataflow;
let result = run_dataflow(dataflow, runtime).await?;
match result {
DataflowResult::Success => println!("Dataflow completed"),
DataflowResult::Failed(err) => eprintln!("Failed: {}", err),
DataflowResult::Cancelled => println!("Cancelled by user"),
}
With Logging
use mofa_runtime::dora_adapter::{
run_dataflow_with_logs, LogDestination
};
let log_dest = LogDestination::File {
path: "./logs/dataflow.log".into(),
rotation: Some(LogRotation::Daily),
};
run_dataflow_with_logs(dataflow, runtime, log_dest).await?;
Background Execution
let runtime = DoraRuntime::new(mode)?;
let handle = runtime.spawn_dataflow(dataflow);
// Do other work...
// Wait for completion
let result = handle.await?;
Message Passing
Zero-copy Messaging
Dora uses shared memory for efficient communication:
use mofa_runtime::dora_adapter::MessageEnvelope;
// Send large data without copying
let large_data = vec![0u8; 10_000_000]; // 10MB
let envelope = MessageEnvelope::new("image_data", large_data);
node.send_output("processed_image", envelope).await?;
Serialization
use mofa_runtime::dora_adapter::DoraChannel;
// Send structured data
let result = ProcessingResult {
status: "completed".to_string(),
metrics: metrics_data,
};
let serialized = bincode::serialize(&result)?;
node.send_output("results", serialized).await?;
Fault Tolerance
Node Restarts
let node_config = DoraNodeConfig {
id: "resilient_agent".to_string(),
restart_policy: RestartPolicy::Always,
max_restarts: 3,
restart_delay_ms: 5000,
..Default::default()
};
Health Checks
impl DoraAgentNode {
async fn health_check(&self) -> Result<HealthStatus> {
// Check agent state
if self.agent.is_healthy().await {
Ok(HealthStatus::Healthy)
} else {
Ok(HealthStatus::Degraded {
reason: "High error rate".to_string()
})
}
}
}
Monitoring
Integrate with MoFA monitoring:
use mofa_monitoring::MetricsCollector;
let collector = Arc::new(MetricsCollector::new(config));
let node = DoraAgentNode::new(agent, node_config)
.with_metrics_collector(collector);
// Metrics are automatically collected
node.run().await?;
Distributed Coordination
Coordinator Setup
# Start coordinator
dora coordinator --addr 0.0.0.0:4000
Worker Registration
let config = DistributedConfig {
coordinator_addr: "http://coordinator:4000".to_string(),
machine_id: "worker-01".to_string(),
listen_addr: "0.0.0.0:5000".to_string(),
capabilities: vec![
"gpu".to_string(),
"large-memory".to_string(),
],
..Default::default()
};
let runtime = DoraRuntime::new(RuntimeMode::Distributed(config))?;
runtime.register_worker().await?;
Best Practices
- Use operators for simple logic: Reserve agent nodes for complex reasoning
- Enable shared memory: Use zero-copy for large data transfers
- Set appropriate buffer sizes: Balance latency and memory usage
- Implement health checks: Enable automatic recovery
- Monitor dataflow performance: Track message rates and latency
- Handle backpressure: Implement flow control for slow consumers
- Use typed messages: Define clear data schemas
- Test locally first: Debug in embedded mode before deploying
Buffer Sizing
let config = ChannelConfig {
buffer_size: 10000, // Larger buffer for bursty traffic
shared_memory: true, // Zero-copy for large messages
..Default::default()
};
Concurrency
let node_config = DoraNodeConfig {
id: "parallel_agent".to_string(),
concurrency: 4, // Process 4 messages concurrently
..Default::default()
};
Backpressure
impl DoraAgentNode {
async fn on_input(&mut self, input_id: &str, data: Vec<u8>) -> Result<()> {
// Check queue depth
if self.pending_tasks.len() > 1000 {
// Signal backpressure
self.send_control_message("slow_down").await?;
}
self.process_input(input_id, data).await
}
}
Deployment
Docker Compose
version: '3'
services:
coordinator:
image: dora-coordinator
ports:
- "4000:4000"
worker-1:
image: mofa-agent
environment:
- DORA_COORDINATOR=http://coordinator:4000
- MACHINE_ID=worker-1
depends_on:
- coordinator
worker-2:
image: mofa-agent
environment:
- DORA_COORDINATOR=http://coordinator:4000
- MACHINE_ID=worker-2
depends_on:
- coordinator
Kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: mofa-worker
spec:
replicas: 3
selector:
matchLabels:
app: mofa-worker
template:
metadata:
labels:
app: mofa-worker
spec:
containers:
- name: worker
image: mofa-agent:latest
env:
- name: DORA_COORDINATOR
value: "http://coordinator-service:4000"
- name: MACHINE_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
Troubleshooting
Connection Issues
// Enable debug logging
let config = DistributedConfig {
log_level: "debug".to_string(),
..config
};
// Check connectivity
runtime.ping_coordinator().await?;
Message Drops
// Monitor channel statistics
let stats = node.get_channel_stats("output").await?;
println!("Dropped: {}", stats.dropped_messages);
println!("Backpressure events: {}", stats.backpressure_count);
// Enable profiling
let node = DoraAgentNode::new(agent, config)
.with_profiling(true);
let profile = node.get_profile_data().await?;
println!("Avg processing time: {}ms", profile.avg_processing_ms);
See Also