Skip to main content
Datadog Data Streams Monitoring (DSM) gives you end-to-end visibility into the health and performance of your message-based pipelines. It tracks the pathway a message takes from producer to consumer, measuring queue depth, consumer lag, and end-to-end latency at each hop.

What Data Streams Monitoring tracks

End-to-end latency

Measures how long it takes for a message to travel from the point it was produced to where it is consumed, including any intermediate queues.

Consumer lag

Tracks how far behind consumers are relative to the latest message on each topic or queue.

Throughput

Records message production and consumption rates per service and pipeline.

Pipeline topology

Builds an automatically discovered map of your streaming pipeline showing how services are connected through brokers.

Enabling Data Streams Monitoring

1

Set the environment variable

DD_DATA_STREAMS_ENABLED=true node server.js
2

Or enable programmatically

const tracer = require('dd-trace').init({
  dsmEnabled: true,
})

Automatic instrumentation

With DSM enabled, dd-trace automatically instruments the following messaging libraries and injects DSM pathway context into message headers:
LibraryTransport
kafkajsKafka
@confluentinc/kafka-javascriptKafka
amqplibRabbitMQ / AMQP 0-9-1
amqp10AMQP 1.0
aws-sdk (SQS, SNS, Kinesis, EventBridge)AWS messaging
azure-service-busAzure Service Bus
azure-event-hubsAzure Event Hubs
google-cloud-pubsubGoogle Cloud Pub/Sub
rheaAMQP
No code changes are required for auto-instrumented libraries.

Manual instrumentation

For messaging libraries not automatically instrumented, or for custom pipeline steps, use the tracer.dataStreamsCheckpointer API to manually record produce and consume checkpoints.

tracer.dataStreamsCheckpointer

The checkpointer is available as tracer.dataStreamsCheckpointer once DSM is enabled.

setProduceCheckpoint(type, target, carrier)

Records a produce checkpoint and injects DSM pathway context into the provided carrier object. Call this when your service publishes a message.
const tracer = require('dd-trace').init({ dsmEnabled: true })

async function publishMessage(topic, payload) {
  const message = {
    key: payload.id,
    value: JSON.stringify(payload),
    headers: {}, // carrier object
  }

  tracer.dataStreamsCheckpointer.setProduceCheckpoint(
    'kafka',   // type: the streaming technology
    topic,     // target: topic, queue, or stream name
    message.headers // carrier: object to inject DSM context into
  )

  await producer.send({ topic, messages: [message] })
}
ParameterTypeDescription
typestringThe streaming technology (e.g., kafka, kinesis, sns, rabbitmq)
targetstringThe target topic, queue, or stream name
carrierobjectObject to inject DSM context into (e.g., message headers)

setConsumeCheckpoint(type, source, carrier, manualCheckpoint?)

Records a consume checkpoint and extracts DSM pathway context from the provided carrier. Call this when your service receives a message.
async function processMessage(message) {
  tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
    'kafka',          // type
    message.topic,    // source
    message.headers,  // carrier: extract DSM context from headers
    true              // manualCheckpoint (default: true)
  )

  await handlePayload(JSON.parse(message.value))
}
ParameterTypeDefaultDescription
typestringThe streaming technology
sourcestringThe source topic, queue, or stream name
carrierobjectObject to extract DSM context from
manualCheckpointbooleantrueSet to true for manual instrumentation. Manual checkpoints override automatic ones if both fire for the same message.

trackTransaction(transactionId, checkpointName, span?)

Records a transaction ID at a named checkpoint without pathway propagation. Useful for correlating a message with a business-level transaction across multiple pipeline hops.
tracer.dataStreamsCheckpointer.trackTransaction(
  'order-12345',         // transactionId: the business transaction ID
  'payment-validation',  // checkpointName: a logical name for this checkpoint
  // span: optional, defaults to the active span
)
The transactionId is truncated to 255 UTF-8 bytes. The checkpointName is assigned a stable 1-byte ID per process lifetime for efficiency.

End-to-end example

const tracer = require('dd-trace').init({ dsmEnabled: true })
const { Kafka } = require('kafkajs')

const kafka = new Kafka({ brokers: ['localhost:9092'] })
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'my-group' })

// Producer
async function sendOrder(order) {
  const headers = {}
  tracer.dataStreamsCheckpointer.setProduceCheckpoint('kafka', 'orders', headers)

  await producer.send({
    topic: 'orders',
    messages: [{ value: JSON.stringify(order), headers }],
  })
}

// Consumer
await consumer.run({
  eachMessage: async ({ topic, message }) => {
    tracer.dataStreamsCheckpointer.setConsumeCheckpoint('kafka', topic, message.headers)
    await processOrder(JSON.parse(message.value))
  },
})
When using auto-instrumented libraries (such as kafkajs), you do not need to call the checkpointer methods manually — the instrumentation handles it for you. Use the manual API only for libraries or custom transports that dd-trace does not automatically instrument.

Build docs developers (and LLMs) love