Distributed mode separates Amp into distinct server, worker, and controller components that coordinate via a shared metadata database. This architecture enables production deployments with resource isolation, horizontal scaling, and high availability.
Architecture Overview
Distributed mode consists of three independent components:
Server Handles query serving via Arrow Flight and JSON Lines interfaces
Controller Provides Admin API for job scheduling and management
Worker Executes extraction jobs and writes Parquet files
All components coordinate through a shared PostgreSQL metadata database and object store.
Component Details
Server Component
The server component provides query interfaces without executing extraction jobs.
Purpose:
Query serving for extracted data
Multiple query interface support (Flight + JSONL)
Read-only access to datasets
Ports:
1602 : Arrow Flight (gRPC) - high-performance binary queries
1603 : JSON Lines (HTTP) - simple query interface
Starting the server:
# Start both query interfaces (default)
ampd server --config config.toml
# Start only Arrow Flight
ampd server --config config.toml --flight-server
# Start only JSON Lines
ampd server --config config.toml --jsonl-server
Configuration:
# config.toml
flight_addr = "0.0.0.0:1602"
jsonl_addr = "0.0.0.0:1603"
data_dir = "data"
providers_dir = "providers"
manifests_dir = "manifests"
[ metadata_db ]
url = "postgresql://user:password@db-host:5432/amp"
The server requires --config (or AMP_CONFIG environment variable). Default paths for data, providers, and manifests are resolved relative to the config file’s parent directory.
Controller Component
The controller provides the Admin API for managing Amp operations.
Purpose:
Job scheduling and control
Worker health monitoring
Dataset registry management
File metadata queries
Port:
Starting the controller:
ampd controller --config config.toml
Admin API Operations:
# List all datasets
curl http://localhost:1610/datasets
# Deploy a dataset (start extraction job)
curl -X POST http://localhost:1610/datasets/my_namespace/eth_mainnet/versions/1.0.0/deploy \
-H "Content-Type: application/json" \
-d '{
"end_block": 20000000
}'
# List all jobs
curl http://localhost:1610/jobs
# Get job status
curl http://localhost:1610/jobs/42
# Stop a running job
curl -X PUT http://localhost:1610/jobs/42/stop
# List registered workers
curl http://localhost:1610/locations
# List files for a dataset
curl http://localhost:1610/files?dataset=my_namespace/eth_mainnet
Worker Component
Workers execute scheduled extraction jobs and can be horizontally scaled.
Purpose:
Execute dump jobs
Pull data from blockchain sources
Write Parquet files to object store
Update job status and metadata
Requirements:
Unique --node-id for each worker
Access to metadata database
Write access to object store
Starting workers:
# Single worker
ampd worker --config config.toml --node-id worker-01
# Multiple workers for parallel extraction
ampd worker --config config.toml --node-id worker-01 &
ampd worker --config config.toml --node-id worker-02 &
ampd worker --config config.toml --node-id worker-03 &
# Geographic distribution
ampd worker --config config.toml --node-id us-east-1a-worker
ampd worker --config config.toml --node-id eu-west-1b-worker
Worker Coordination:
Workers coordinate through the metadata database:
Mechanism Interval Purpose Heartbeat 1 second Health monitoring LISTEN/NOTIFY Real-time Job notifications via PostgreSQL State Reconciliation 60 seconds Periodic state sync Graceful Resume On restart Jobs resume from last checkpoint
Complete Deployment Example
Here’s a complete production deployment with all components:
1. Prepare Configuration
# config.toml
# Storage paths
data_dir = "data"
providers_dir = "providers"
manifests_dir = "manifests"
# Service addresses
flight_addr = "0.0.0.0:1602"
jsonl_addr = "0.0.0.0:1603"
admin_addr = "0.0.0.0:1610"
# Database connection (required for all components)
[ metadata_db ]
url = "postgresql://amp_user:secure_password@db.example.com:5432/amp"
pool_size = 10
auto_migrate = true
# Performance tuning
[ writer ]
compression = "zstd(1)"
max_row_group_mb = 512
[ writer . compactor ]
active = true
min_interval = 1.0
[ writer . collector ]
active = true
min_interval = 30.0
# Observability
[ opentelemetry ]
trace_url = "http://otel-collector:4318/v1/traces"
metrics_url = "http://otel-collector:4318/v1/metrics"
2. Run Database Migrations
# One-time setup
ampd migrate --config config.toml
3. Start Controller
# Management node
ampd controller --config config.toml
Verify controller is running:
curl http://localhost:1610/datasets
4. Start Query Server(s)
# Query node 1
ampd server --config config.toml
# Query node 2 (for load balancing)
ampd server --config config.toml
Verify servers are running:
# Test JSON Lines endpoint
curl http://localhost:1603/health
# Test Arrow Flight endpoint (requires grpcurl)
grpcurl -plaintext localhost:1602 list
5. Start Workers
# Worker node 1
ampd worker --config config.toml --node-id worker-01
# Worker node 2
ampd worker --config config.toml --node-id worker-02
# Worker node 3
ampd worker --config config.toml --node-id worker-03
Verify workers registered:
curl http://localhost:1610/locations
6. Deploy a Dataset
# Start extraction job
curl -X POST http://localhost:1610/datasets/my_namespace/eth_mainnet/versions/1.0.0/deploy \
-H "Content-Type: application/json" \
-d '{
"end_block": 20000000
}'
# Monitor job progress
curl http://localhost:1610/jobs
Scaling Considerations
Scaling Query Servers
Horizontal Scaling:
Deploy multiple server instances behind a load balancer:
# Server 1
ampd server --config config.toml
# Server 2
ampd server --config config.toml
# Server 3
ampd server --config config.toml
Load Balancing:
Use TCP load balancer for Arrow Flight (gRPC on port 1602)
Use HTTP load balancer for JSON Lines (port 1603)
Ensure load balancer supports HTTP/2 for Flight
Scaling Workers
Adding More Workers:
Simply start additional worker processes with unique node IDs:
# Add worker 4
ampd worker --config config.toml --node-id worker-04
# Add worker 5
ampd worker --config config.toml --node-id worker-05
Work Distribution:
Jobs are automatically distributed to available workers
Workers with active jobs receive priority for related blocks
Failed workers trigger automatic job reassignment
No manual intervention needed
Scaling Database
PostgreSQL Optimization:
-- Increase connection pool size
max_connections = 200
-- Optimize for concurrent writes
shared_buffers = 4GB
effective_cache_size = 12GB
work_mem = 16MB
maintenance_work_mem = 1GB
-- Enable query performance
random_page_cost = 1 . 1
effective_io_concurrency = 200
Read Replicas:
Consider PostgreSQL read replicas for query-heavy workloads.
Worker Coordination
Health Monitoring
Workers maintain health through:
Heartbeat : Every 1 second to metadata DB
Registration : Node ID + metadata on startup
Status Updates : Job progress and completion
Job Assignment
Jobs are assigned to workers via:
PostgreSQL LISTEN/NOTIFY : Real-time job notifications
State Reconciliation : Periodic sync every 60 seconds
Failover : Automatic reassignment if worker crashes
Graceful Shutdown
# Send SIGTERM for graceful shutdown
kill -TERM < worker_pi d >
# Worker will:
# 1. Stop accepting new jobs
# 2. Complete current job if possible
# 3. Update job status in database
# 4. Deregister from metadata DB
High Availability Patterns
Multi-Region Deployment
Region A (US-East) Region B (EU-West)
┌────────────────────┐ ┌────────────────────┐
│ ampd server │ │ ampd server │
│ (load balanced) │ │ (load balanced) │
└────────────────────┘ └────────────────────┘
│ │
┌────────────────────┐ ┌────────────────────┐
│ ampd controller │ │ ampd worker │
│ │ │ (regional) │
└────────────────────┘ └────────────────────┘
│ │
┌────────────────────┐ │
│ ampd worker │ │
│ (regional) │ │
└────────────────────┘ │
│ │
└────────────────────────────────┘
│
┌────────────────────────┐
│ PostgreSQL (primary) │
│ + Read Replicas │
└────────────────────────┘
│
┌────────────────────────┐
│ Object Store (S3) │
└────────────────────────┘
Failure Scenarios
Component Fails Impact Recovery Single Worker Minimal - job reassigned to another worker Automatic via heartbeat timeout All Workers No new extraction, queries continue Restart workers, jobs resume Single Server Reduced query capacity Load balancer routes to healthy servers Controller No new jobs can be scheduled Restart controller, existing jobs continue Database All operations stop Restore from backup, workers/servers reconnect
Docker Deployment Example
Complete docker-compose setup for distributed mode:
version : '3.8'
services :
# PostgreSQL metadata database
postgres :
image : postgres:alpine
environment :
POSTGRES_USER : amp_user
POSTGRES_PASSWORD : secure_password
POSTGRES_DB : amp
volumes :
- postgres_data:/var/lib/postgresql/data
ports :
- "5432:5432"
# Controller (Admin API)
controller :
image : amp:latest
command : controller --config /etc/amp/config.toml
ports :
- "1610:1610"
volumes :
- ./config.toml:/etc/amp/config.toml
- ./data:/data
depends_on :
- postgres
environment :
- AMP_CONFIG=/etc/amp/config.toml
# Query Server
server :
image : amp:latest
command : server --config /etc/amp/config.toml
ports :
- "1602:1602"
- "1603:1603"
volumes :
- ./config.toml:/etc/amp/config.toml
- ./data:/data
depends_on :
- postgres
deploy :
replicas : 2 # Scale for load balancing
# Workers
worker :
image : amp:latest
command : worker --config /etc/amp/config.toml --node-id worker-${WORKER_ID}
volumes :
- ./config.toml:/etc/amp/config.toml
- ./data:/data
depends_on :
- postgres
- controller
deploy :
replicas : 3 # Scale for parallel extraction
# Observability (optional)
lgtm :
image : grafana/otel-lgtm
ports :
- "3000:3000" # Grafana
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
volumes :
postgres_data :
Start the stack:
Next Steps
Production Guide Best practices for production deployments
Configuration Detailed configuration options
Monitoring Set up monitoring and observability