Skip to main content
dd-trace instruments messaging and queue libraries to produce producer and consumer spans. When Data Streams Monitoring is enabled, these plugins also propagate DSM context through message headers for end-to-end pipeline latency tracking.

Supported integrations

KafkaJS

kafkajs

Confluent Kafka

@confluentinc/kafka-javascript

amqplib (RabbitMQ)

amqplib

amqp10

amqp10

BullMQ

bullmq

Rhea (AMQP)

rhea

Google Cloud Pub/Sub

@google-cloud/pubsub

AWS SDK (SQS/SNS)

aws-sdk / @aws-sdk/smithy-client

Azure Event Hubs

@azure/event-hubs

Azure Service Bus

@azure/service-bus

Enabling Data Streams Monitoring

Data Streams Monitoring (DSM) provides end-to-end latency and throughput metrics for your data pipelines. Enable it in tracer.init() or via the environment variable:
const tracer = require('dd-trace').init({
  dsmEnabled: true
})
DD_DATA_STREAMS_ENABLED=true node app.js
Once enabled, the messaging plugins automatically inject and extract DSM context in message headers.

Context propagation in message headers

Producer spans inject trace context into message headers. Consumer spans extract that context and continue the trace. This enables end-to-end distributed tracing across service boundaries. For AWS SQS and similar services, context is injected into the MessageAttributes of each message. You can control batch propagation with the batchPropagationEnabled option.

KafkaJS

const tracer = require('dd-trace').init()

tracer.use('kafkajs', {
  service: 'my-kafka'
})

const { Kafka } = require('kafkajs')
const kafka = new Kafka({ brokers: ['localhost:9092'] })

// Producer spans are created automatically
const producer = kafka.producer()
await producer.send({
  topic: 'orders',
  messages: [{ value: JSON.stringify(order) }]
})

// Consumer spans are created automatically
const consumer = kafka.consumer({ groupId: 'order-processor' })
await consumer.run({
  eachMessage: async ({ message }) => {
    // Trace context from the producer is automatically extracted
  }
})

amqplib (RabbitMQ)

const tracer = require('dd-trace').init()

tracer.use('amqplib', {
  service: 'rabbitmq'
})

const amqplib = require('amqplib')
const conn = await amqplib.connect('amqp://localhost')
const ch = await conn.createChannel()

// Spans are created automatically for publish and consume
await ch.assertQueue('tasks')
ch.sendToQueue('tasks', Buffer.from(JSON.stringify(task)))

AWS SDK (SQS, SNS, Kinesis)

const tracer = require('dd-trace').init()

tracer.use('aws-sdk', {
  service: 'aws',
  // Inject trace context into all messages in batch sends
  batchPropagationEnabled: true,
  hooks: {
    request: (span, response) => {
      span.setTag('aws.region', response.request.httpRequest.region)
    }
  },
  // Disable specific services
  S3: false
})
The aws-sdk plugin handles both v2 (aws-sdk) and v3 (@aws-sdk/smithy-client, @smithy/smithy-client) SDK versions.

BullMQ

const tracer = require('dd-trace').init()

tracer.use('bullmq', {
  service: 'job-queue'
})

const { Queue, Worker } = require('bullmq')

// Producer spans are created automatically when jobs are added
const queue = new Queue('emails')
await queue.add('send-welcome', { userId: '123' })

// Consumer spans are created automatically when jobs are processed
const worker = new Worker('emails', async job => {
  await sendWelcomeEmail(job.data.userId)
})

Google Cloud Pub/Sub

const tracer = require('dd-trace').init()

tracer.use('google-cloud-pubsub', {
  service: 'pubsub'
})

const { PubSub } = require('@google-cloud/pubsub')
const pubsub = new PubSub()

// Publish spans are created automatically
const topic = pubsub.topic('my-topic')
await topic.publishMessage({ data: Buffer.from(JSON.stringify(event)) })

Azure Event Hubs and Service Bus

const tracer = require('dd-trace').init()

tracer.use('azure-event-hubs', { service: 'event-hubs' })
tracer.use('azure-service-bus', { service: 'service-bus' })
Both plugins are enabled by default when the corresponding npm packages are installed.

Manual Data Streams checkpoints

For messaging systems not covered by automatic plugins, use the manual checkpointer API:
const tracer = require('dd-trace').init({ dsmEnabled: true })

// On the producer side
const carrier = {}
tracer.dataStreamsCheckpointer.setProduceCheckpoint('kafka', 'orders', carrier)
// Inject carrier into your message headers

// On the consumer side
const ctx = tracer.dataStreamsCheckpointer.setConsumeCheckpoint('kafka', 'orders', carrier)

Build docs developers (and LLMs) love