Documentation Index
Fetch the complete documentation index at: https://mintlify.com/cadence-workflow/cadence/llms.txt
Use this file to discover all available pages before exploring further.
Async Workflow Queues enable asynchronous workflow starts via Kafka, providing rate limiting, buffering, and decoupling of workflow initiation from execution.
Overview
Async Workflow Queues provide:
- Rate Limiting: Control workflow start rate per domain
- Buffering: Queue workflow starts during high load
- Decoupling: Separate request acceptance from execution
- Backpressure: Prevent overload from workflow start storms
- Multi-Tenancy: Per-domain queue configuration
Architecture
Client → Frontend → Kafka Topic → Async Worker → History Service
(Queue) (Consumer)
- Client submits workflow start request
- Frontend validates and publishes to Kafka
- Request acknowledged immediately
- Async worker consumes from Kafka
- Worker starts workflow via normal path
- Rate limiting and retries handled by worker
Setup
Prerequisites
- Kafka cluster (version 2.0+)
- Cadence server with async workflow feature enabled
- Worker service for consuming queue
Kafka Configuration
Create Kafka topics for async workflow queue:
# Create topic
kafka-topics.sh --create \
--bootstrap-server kafka:9092 \
--topic cadence-async-workflow-queue \
--partitions 10 \
--replication-factor 3 \
--config retention.ms=86400000 \
--config max.message.bytes=4194304
# Verify topic
kafka-topics.sh --describe \
--bootstrap-server kafka:9092 \
--topic cadence-async-workflow-queue
Partitions: More partitions = higher throughput, but more consumers needed
Retention: How long requests stay in queue (24h = 86400000ms)
Server Configuration
Enable async workflow queues in config.yaml:
publicclient:
hostPort: "127.0.0.1:7833"
kafka:
clusters:
test:
brokers:
- kafka1.example.com:9092
- kafka2.example.com:9092
- kafka3.example.com:9092
topics:
cadence-async-workflow-queue:
cluster: test
applications:
async-wf-consumer:
topic: cadence-async-workflow-queue
cluster: test
dlq-topic: cadence-async-workflow-dlq
services:
worker:
asyncWorkflowQueueProvider:
enabled: true
kafkaConfig:
cluster: test
topic: cadence-async-workflow-queue
consumerGroup: cadence-async-wf-consumer
concurrency: 100
Domain Configuration
Enable async workflow queue per domain:
# Get current configuration
cadence admin domain get-async-wf-config --domain my-domain
# Update configuration
cadence admin domain update-async-wf-config \
--domain my-domain \
--json '{
"enabled": true,
"predefinedQueueName": "cadence-async-workflow-queue",
"queueType": "kafka",
"queueConfig": {
"encodingType": "JSON",
"version": 0
}
}'
Configuration Fields:
enabled: Enable/disable async queue for domain
predefinedQueueName: Kafka topic name
queueType: Queue backend (currently only “kafka”)
queueConfig: Queue-specific configuration
Worker Service Setup
Async workers are part of the Cadence worker service:
# Start worker service with async queue consumer
./cadence-server --zone async_wf_kafka_queue start
Or via Docker:
services:
cadence-worker:
image: ubercadence/server:master-auto-setup
command:
- /bin/sh
- -c
- |
/cadence-server --zone async_wf_kafka_queue start
environment:
- KAFKA_SEEDS=kafka:9092
depends_on:
- kafka
Usage
Starting Async Workflows
Go SDK:
import (
"go.uber.org/cadence/client"
)
func startAsyncWorkflow(c client.Client) error {
options := client.StartWorkflowOptions{
ID: "workflow-1",
TaskList: "my-task-list",
ExecutionStartToCloseTimeout: time.Hour,
}
// Start asynchronously
// Returns immediately after queueing to Kafka
we, err := c.StartWorkflow(context.Background(), options, MyWorkflow)
if err != nil {
return err
}
// WorkflowExecution returned immediately
// Actual workflow start happens asynchronously
fmt.Printf("Queued workflow: %s\n", we.ID)
return nil
}
Java SDK:
WorkflowOptions options = WorkflowOptions.newBuilder()
.setWorkflowId("workflow-1")
.setTaskList("my-task-list")
.setExecutionStartToCloseTimeout(Duration.ofHours(1))
.build();
MyWorkflow workflow = client.newWorkflowStub(MyWorkflow.class, options);
// Start asynchronously
WorkflowExecution execution = WorkflowClient.start(workflow::processOrder, order);
// Returns immediately after queueing
System.out.println("Queued workflow: " + execution.getWorkflowId());
CLI:
# Start workflow (async if domain configured)
cadence --do my-domain workflow start \
--tl my-task-list \
--wt MyWorkflow \
--et 3600 \
--input '{"orderId": "12345"}'
No special flags needed - async behavior automatic if domain is configured.
Checking Workflow Status
Workflow may not start immediately:
# Check if workflow has started
cadence --do my-domain workflow describe -w workflow-1
# List workflows
cadence --do my-domain workflow list
# Observe workflow (waits for start)
cadence --do my-domain workflow observe -w workflow-1
Rate Limiting
Consumer Rate Limiting
Control workflow start rate:
# Dynamic config
worker.asyncWorkflowConsumerStartRPS:
- value: 1000
constraints:
domainName: "my-domain"
worker.asyncWorkflowConsumerConcurrency:
- value: 100
constraints:
domainName: "my-domain"
Backpressure Handling
Kafka provides natural backpressure:
- Queue Full: Kafka rejects new messages if retention exceeded
- Slow Consumption: Messages accumulate in Kafka
- Consumer Lag: Monitor
consumer_lag metric
Monitoring
Key Metrics
Queue Depth:
# Messages waiting in queue
sum(kafka_consumer_lag{topic="cadence-async-workflow-queue"})
Consumption Rate:
# Workflows started per second
sum(rate(cadence_async_workflow_consumed_total[5m]))
Failure Rate:
# Failed workflow starts
sum(rate(cadence_async_workflow_failed_total[5m]))
End-to-End Latency:
# Time from queue to workflow start
histogram_quantile(0.95,
rate(cadence_async_workflow_e2e_latency_bucket[5m])
)
CLI Monitoring
# Check domain configuration
cadence admin domain get-async-wf-config --domain my-domain
# Monitor Kafka consumer lag
kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe \
--group cadence-async-wf-consumer
Alerting
High Consumer Lag:
alert: AsyncWorkflowHighLag
expr: kafka_consumer_lag{topic="cadence-async-workflow-queue"} > 10000
for: 5m
annotations:
summary: "Async workflow queue backed up"
High Failure Rate:
alert: AsyncWorkflowHighFailures
expr: rate(cadence_async_workflow_failed_total[5m]) > 10
for: 5m
annotations:
summary: "High async workflow failure rate"
Best Practices
Queue Configuration
- Partition Count: Start with
num_workers * concurrency / 1000 partitions
- Retention: Set based on acceptable delay (24h typical)
- Replication: Use RF=3 for durability
- Max Message Size: Match Cadence payload limits
Consumer Tuning
- Concurrency: Balance throughput vs. resource usage
- Rate Limiting: Prevent overload of downstream services
- Batch Size: Tune Kafka consumer
fetch.min.bytes
- Commit Interval: Balance consistency vs. throughput
Operational
- Monitor Lag: Alert if lag exceeds threshold
- DLQ: Configure dead-letter queue for failed starts
- Capacity Planning: Size workers for peak load + headroom
- Testing: Test queue behavior under load
Use Cases
Rate-Limited API
Protect backend from workflow start storms:
// API endpoint
func createOrder(w http.ResponseWriter, r *http.Request) {
// Parse request
var order Order
json.NewDecoder(r.Body).Decode(&order)
// Start workflow asynchronously
// Returns immediately, rate limited by queue consumer
we, err := cadenceClient.StartWorkflow(
context.Background(),
client.StartWorkflowOptions{
ID: fmt.Sprintf("order-%s", order.ID),
TaskList: "order-processing",
},
ProcessOrderWorkflow,
order,
)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
// Respond immediately
json.NewEncoder(w).Encode(map[string]string{
"workflowId": we.ID,
"status": "queued",
})
}
Batch Processing
Queue large batches of workflow starts:
func processBatch(items []Item) error {
for _, item := range items {
// Each workflow start goes to queue
// No blocking on workflow execution
_, err := cadenceClient.StartWorkflow(
context.Background(),
client.StartWorkflowOptions{
ID: fmt.Sprintf("batch-item-%s", item.ID),
},
ProcessItemWorkflow,
item,
)
if err != nil {
log.Printf("Failed to queue: %v", err)
// Continue with other items
}
}
return nil
}
Decoupled Services
Decouple request service from workflow execution:
Request Service → Kafka → Async Worker → Workflow
(Fast) (Buffer) (Rate Limited) (Execution)
Request service can scale independently of workflow capacity.
Troubleshooting
High Consumer Lag
Problem: Kafka consumer lag increasing
Solution:
- Scale up worker instances
- Increase consumer concurrency
- Check for slow workflow starts
- Review rate limiting settings
- Verify workers are healthy
Workflows Not Starting
Problem: Workflows queued but not executing
Solution:
# Check async worker status
cadence admin cluster health
# Verify Kafka connectivity
kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--topic cadence-async-workflow-queue \
--from-beginning
# Check worker logs
grep "async.*workflow" /var/log/cadence/worker.log
# Verify domain configuration
cadence admin domain get-async-wf-config --domain my-domain
Failed Workflow Starts
Problem: High failure rate for async workflow starts
Solution:
- Check DLQ for failed messages
- Review error logs in worker
- Verify workflow registration
- Check task list worker availability
- Validate workflow input payloads
Advanced Topics
Dead Letter Queue
Handle failed workflow starts:
kafka:
topics:
cadence-async-workflow-dlq:
cluster: test
applications:
async-wf-consumer:
topic: cadence-async-workflow-queue
dlq-topic: cadence-async-workflow-dlq
Failed starts go to DLQ after max retries. Process DLQ separately:
# Monitor DLQ
kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--topic cadence-async-workflow-dlq
# Replay from DLQ (custom tool)
./replay-dlq --topic cadence-async-workflow-dlq
Custom Queue Implementation
Implement custom queue backend:
type Queue interface {
// Enqueue workflow start request
Enqueue(ctx context.Context, req *StartRequest) error
// Consume workflow start requests
Consume(ctx context.Context) (<-chan *StartRequest, error)
}
Register in queue provider and configure per domain.
Next Steps