Documentation Index
Fetch the complete documentation index at: https://mintlify.com/mofa-org/mofa/llms.txt
Use this file to discover all available pages before exploring further.
Monitoring and Observability
MoFA provides comprehensive monitoring capabilities through a web-based dashboard, metrics collection, WebSocket updates, and distributed tracing integration.
Dashboard Server
Quick Start
use mofa_monitoring::{DashboardServer, DashboardConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let config = DashboardConfig::new()
.with_host("127.0.0.1")
.with_port(8080)
.with_cors(true)
.with_ws_interval(Duration::from_secs(1));
let server = DashboardServer::new(config);
server.start().await?;
Ok(())
}
Access the dashboard at http://127.0.0.1:8080
Configuration
let config = DashboardConfig::new()
.with_host("0.0.0.0") // Bind address
.with_port(3000) // Port number
.with_cors(true) // Enable CORS
.with_ws_interval(Duration::from_secs(2)) // WebSocket update interval
.with_auth(Arc::new(token_auth)); // Optional authentication
let server = DashboardServer::new(config);
Metrics Collection
Agent Metrics
Track agent state and performance:
use mofa_monitoring::{MetricsCollector, AgentMetrics};
use std::sync::Arc;
let collector = Arc::new(MetricsCollector::new(
MetricsConfig::default()
));
let metrics = AgentMetrics {
agent_id: "agent-001".to_string(),
name: "Research Agent".to_string(),
state: "running".to_string(),
tasks_completed: 42,
tasks_failed: 2,
tasks_in_progress: 3,
messages_sent: 158,
messages_received: 201,
last_activity: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
avg_task_duration_ms: 1250.5,
};
collector.update_agent(metrics).await;
Workflow Metrics
Monitor workflow execution:
use mofa_monitoring::WorkflowMetrics;
let metrics = WorkflowMetrics {
workflow_id: "wf-001".to_string(),
name: "Content Pipeline".to_string(),
status: "running".to_string(),
total_executions: 150,
successful_executions: 145,
failed_executions: 5,
running_instances: 2,
avg_execution_time_ms: 3400.0,
node_count: 7,
};
collector.update_workflow(metrics).await;
LLM Metrics
Track LLM API usage and performance:
use mofa_monitoring::LLMMetrics;
let metrics = LLMMetrics {
plugin_id: "openai-gpt4".to_string(),
provider_name: "OpenAI".to_string(),
model_name: "gpt-4".to_string(),
state: "running".to_string(),
total_requests: 1250,
successful_requests: 1190,
failed_requests: 60,
total_tokens: 245000,
prompt_tokens: 120000,
completion_tokens: 125000,
avg_latency_ms: 1850.5,
tokens_per_second: Some(42.3),
time_to_first_token_ms: Some(320.0),
requests_per_minute: 25.5,
error_rate: 4.8,
last_request_timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
};
collector.update_llm(metrics).await;
Plugin Metrics
Monitor plugin health:
use mofa_monitoring::PluginMetrics;
let metrics = PluginMetrics {
plugin_id: "plugin-001".to_string(),
name: "Vector DB".to_string(),
version: "2.0.0".to_string(),
state: "running".to_string(),
call_count: 5420,
error_count: 12,
avg_response_time_ms: 45.2,
last_reload: Some(SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()),
reload_count: 3,
};
collector.update_plugin(metrics).await;
System Metrics
Track system resources:
use mofa_monitoring::SystemMetrics;
let metrics = SystemMetrics {
cpu_usage: 45.2, // Percentage
memory_usage: 2048, // MB
memory_total: 16384, // MB
disk_usage: 50000, // MB
disk_total: 256000, // MB
network_rx_bytes: 1024000,
network_tx_bytes: 512000,
uptime_seconds: 86400,
goroutines: 150, // Active tasks/threads
};
collector.update_system(metrics).await;
REST API
The dashboard exposes REST endpoints:
Endpoints
# Dashboard overview
GET /api/overview
# Current metrics snapshot
GET /api/metrics
# List all agents
GET /api/agents
# Get specific agent
GET /api/agents/:id
# List all workflows
GET /api/workflows
# List all plugins
GET /api/plugins
# System status
GET /api/system
# Health check
GET /api/health
Example Response
{
"agents": [
{
"agent_id": "agent-001",
"name": "Research Agent",
"state": "running",
"tasks_completed": 42,
"tasks_failed": 2,
"tasks_in_progress": 3,
"avg_task_duration_ms": 1250.5
}
],
"workflows": [...],
"llm_providers": [...],
"system": {...}
}
WebSocket Updates
Real-time updates via WebSocket:
Client Connection
const ws = new WebSocket('ws://localhost:8080/ws');
ws.onopen = () => {
console.log('Connected to MoFA dashboard');
// Subscribe to topics
ws.send(JSON.stringify({
type: 'subscribe',
topics: ['agents', 'workflows', 'llm']
}));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch(data.type) {
case 'metrics_update':
updateDashboard(data.payload);
break;
case 'alert':
showAlert(data.level, data.message);
break;
}
};
Server-side Alerts
use mofa_monitoring::WebSocketHandler;
if let Some(ws_handler) = server.ws_handler() {
ws_handler.send_alert(
"warning",
"High error rate detected on agent-003",
"monitoring-system"
).await;
}
Dashboard Integration
Integrate monitoring into your application:
use mofa_monitoring::{DashboardServer, DashboardConfig};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create dashboard
let config = DashboardConfig::new().with_port(8080);
let mut server = DashboardServer::new(config);
// Get collector for metrics updates
let collector = server.collector();
// Start background data generator
let collector_clone = collector.clone();
tokio::spawn(async move {
generate_metrics(collector_clone).await;
});
// Build and start server
let router = server.build_router();
// Start metrics collection
collector.start_collection();
// Start WebSocket updates
if let Some(ws_handler) = server.ws_handler() {
tokio::spawn(async move {
ws_handler.start_updates();
});
}
// Run server
let addr = "127.0.0.1:8080".parse()?;
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, router).await?;
Ok(())
}
async fn generate_metrics(collector: Arc<MetricsCollector>) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
// Update agent metrics
collector.update_agent(get_agent_metrics()).await;
// Update workflow metrics
collector.update_workflow(get_workflow_metrics()).await;
// Update LLM metrics
collector.update_llm(get_llm_metrics()).await;
}
}
Distributed Tracing
OpenTelemetry Integration
use mofa_monitoring::tracing::{init_tracing, TracingConfig};
let config = TracingConfig {
service_name: "mofa-agent".to_string(),
otlp_endpoint: Some("http://localhost:4317".to_string()),
sample_ratio: 0.1, // 10% sampling
};
init_tracing(config)?;
Span Instrumentation
use tracing::{info, warn, instrument};
#[instrument(skip(agent))]
async fn process_task(agent: &Agent, task: Task) -> Result<Output> {
info!("Processing task: {}", task.id);
let result = agent.execute(task).await?;
warn!("Task completed in {}ms", result.duration_ms);
Ok(result.output)
}
Context Propagation
use mofa_monitoring::tracing::propagate_context;
let trace_id = propagate_context(&request_headers)?;
info!(trace_id = %trace_id, "Request received");
Metrics Export
use mofa_monitoring::MetricsRegistry;
let registry = MetricsRegistry::new();
// Register metrics
registry.register_gauge("agent_tasks_completed", "Total tasks completed by agent");
registry.register_histogram("llm_latency_ms", "LLM API latency in milliseconds");
// Update values
registry.set_gauge("agent_tasks_completed", 42.0);
registry.observe_histogram("llm_latency_ms", 1250.5);
// Export Prometheus format
let metrics = registry.export_prometheus();
JSON Export
let snapshot = collector.get_snapshot().await;
let json = serde_json::to_string_pretty(&snapshot)?;
println!("{}", json);
Authentication
Token-based Auth
use mofa_monitoring::TokenAuthProvider;
use std::sync::Arc;
let auth = Arc::new(TokenAuthProvider::new(vec![
"secret-token-1".to_string(),
"secret-token-2".to_string(),
]));
let config = DashboardConfig::new()
.with_auth(auth);
Custom Auth Provider
use mofa_monitoring::{AuthProvider, AuthInfo};
use async_trait::async_trait;
struct CustomAuth;
#[async_trait]
impl AuthProvider for CustomAuth {
fn is_enabled(&self) -> bool {
true
}
async fn authenticate(&self, token: &str) -> Result<AuthInfo, String> {
// Custom authentication logic
if validate_token(token) {
Ok(AuthInfo {
user_id: "user-123".to_string(),
permissions: vec!["read".to_string()],
})
} else {
Err("Invalid token".to_string())
}
}
}
let config = DashboardConfig::new()
.with_auth(Arc::new(CustomAuth));
Best Practices
- Sample high-frequency events: Use sampling for high-throughput metrics
- Aggregate metrics: Compute averages and percentiles before export
- Set retention policies: Automatically clean old metrics data
- Monitor the monitor: Track dashboard performance and health
- Secure access: Use authentication in production environments
- Rate limit updates: Avoid overwhelming WebSocket clients
- Use structured logging: Enable correlation with tracing
- Alert on anomalies: Set up automated alerting for critical metrics
- Metrics buffering: Metrics are buffered before updates
- Async collection: Non-blocking metrics collection
- Connection pooling: WebSocket connections are pooled
- Lazy evaluation: Expensive metrics computed on-demand
Troubleshooting
High Memory Usage
let config = MetricsConfig {
max_history_size: 1000, // Limit history
aggregation_window_secs: 60, // Aggregate more frequently
..Default::default()
};
Slow Dashboard
let config = DashboardConfig::new()
.with_ws_interval(Duration::from_secs(5)); // Reduce update frequency
WebSocket Disconnects
Implement reconnection logic:
function connectWebSocket() {
const ws = new WebSocket('ws://localhost:8080/ws');
ws.onclose = () => {
setTimeout(connectWebSocket, 5000); // Retry after 5s
};
return ws;
}
See Also