What Data Streams Monitoring tracks
End-to-end latency
Measures how long it takes for a message to travel from the point it was produced to where it is consumed, including any intermediate queues.
Consumer lag
Tracks how far behind consumers are relative to the latest message on each topic or queue.
Throughput
Records message production and consumption rates per service and pipeline.
Pipeline topology
Builds an automatically discovered map of your streaming pipeline showing how services are connected through brokers.
Enabling Data Streams Monitoring
Automatic instrumentation
With DSM enabled, dd-trace automatically instruments the following messaging libraries and injects DSM pathway context into message headers:| Library | Transport |
|---|---|
kafkajs | Kafka |
@confluentinc/kafka-javascript | Kafka |
amqplib | RabbitMQ / AMQP 0-9-1 |
amqp10 | AMQP 1.0 |
aws-sdk (SQS, SNS, Kinesis, EventBridge) | AWS messaging |
azure-service-bus | Azure Service Bus |
azure-event-hubs | Azure Event Hubs |
google-cloud-pubsub | Google Cloud Pub/Sub |
rhea | AMQP |
Manual instrumentation
For messaging libraries not automatically instrumented, or for custom pipeline steps, use thetracer.dataStreamsCheckpointer API to manually record produce and consume checkpoints.
tracer.dataStreamsCheckpointer
The checkpointer is available astracer.dataStreamsCheckpointer once DSM is enabled.
setProduceCheckpoint(type, target, carrier)
Records a produce checkpoint and injects DSM pathway context into the provided carrier object. Call this when your service publishes a message.| Parameter | Type | Description |
|---|---|---|
type | string | The streaming technology (e.g., kafka, kinesis, sns, rabbitmq) |
target | string | The target topic, queue, or stream name |
carrier | object | Object to inject DSM context into (e.g., message headers) |
setConsumeCheckpoint(type, source, carrier, manualCheckpoint?)
Records a consume checkpoint and extracts DSM pathway context from the provided carrier. Call this when your service receives a message.| Parameter | Type | Default | Description |
|---|---|---|---|
type | string | — | The streaming technology |
source | string | — | The source topic, queue, or stream name |
carrier | object | — | Object to extract DSM context from |
manualCheckpoint | boolean | true | Set to true for manual instrumentation. Manual checkpoints override automatic ones if both fire for the same message. |
trackTransaction(transactionId, checkpointName, span?)
Records a transaction ID at a named checkpoint without pathway propagation. Useful for correlating a message with a business-level transaction across multiple pipeline hops.transactionId is truncated to 255 UTF-8 bytes. The checkpointName is assigned a stable 1-byte ID per process lifetime for efficiency.
End-to-end example
When using auto-instrumented libraries (such as
kafkajs), you do not need to call the checkpointer methods manually — the instrumentation handles it for you. Use the manual API only for libraries or custom transports that dd-trace does not automatically instrument.