Skip to main content
The Data Streams Checkpointer is available as tracer.dataStreamsCheckpointer. It lets you manually set checkpoints for message producers and consumers when automatic instrumentation is not available or insufficient.
const tracer = require('dd-trace').init({
  dsmEnabled: true, // or DD_DATA_STREAMS_ENABLED=true
})
const { dataStreamsCheckpointer } = tracer
Data Streams Monitoring must be enabled via dsmEnabled: true in TracerOptions or DD_DATA_STREAMS_ENABLED=true for checkpoints to have any effect.

Methods

setProduceCheckpoint(type, target, carrier)

Sets a produce checkpoint and injects the DSM propagation context into the provided carrier object.
setProduceCheckpoint(type: string, target: string, carrier: any): void
type
string
required
The streaming technology (e.g., kafka, kinesis, sns, sqs, rabbitmq).
target
string
required
The target of the data: the topic name, exchange name, or stream name.
carrier
any
required
The carrier object to inject the DSM context into. This is typically the message headers object.
// Kafka producer example
async function produceMessage(producer, topic, message) {
  const headers = {}

  tracer.dataStreamsCheckpointer.setProduceCheckpoint('kafka', topic, headers)

  await producer.send({
    topic,
    messages: [{
      value: JSON.stringify(message),
      headers,
    }],
  })
}
// SNS producer example
async function publishToSNS(snsClient, topicArn, message) {
  const carrier = {}

  tracer.dataStreamsCheckpointer.setProduceCheckpoint('sns', topicArn, carrier)

  await snsClient.publish({
    TopicArn: topicArn,
    Message: JSON.stringify(message),
    MessageAttributes: carrier,
  })
}

setConsumeCheckpoint(type, source, carrier, manualCheckpoint?)

Sets a consume checkpoint and extracts the DSM propagation context from the provided carrier.
setConsumeCheckpoint(type: string, source: string, carrier: any, manualCheckpoint?: boolean): any
type
string
required
The streaming technology (e.g., kafka, kinesis, sns, sqs).
source
string
required
The source of the data: the topic name, queue name, or stream name.
carrier
any
required
The carrier object to extract the DSM context from. This is typically the message headers object.
manualCheckpoint
boolean
Whether this checkpoint was set manually. Defaults to true.
returns
any
The DSM context associated with the current pathway.
// Kafka consumer example
async function processMessages(consumer, topic) {
  await consumer.run({
    eachMessage: async ({ topic, message }) => {
      tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
        'kafka',
        topic,
        message.headers
      )

      await handleMessage(message.value)
    },
  })
}
// SQS consumer example
async function processSQSMessages(sqsClient, queueUrl) {
  const response = await sqsClient.receiveMessage({ QueueUrl: queueUrl })

  for (const message of response.Messages ?? []) {
    const carrier = JSON.parse(message.Body)

    tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
      'sqs',
      queueUrl,
      carrier
    )

    await processMessage(message)
  }
}

trackTransaction(transactionId, checkpointName, span?)

Records a transaction ID at a named checkpoint without pathway propagation. Tags the active span (or the provided span) with dsm.transaction.id and dsm.transaction.checkpoint.
trackTransaction(transactionId: string, checkpointName: string, span?: Span | null): void
transactionId
string
required
A unique transaction identifier (truncated to 255 UTF-8 bytes).
checkpointName
string
required
A logical checkpoint name. Should be stable and unique per process lifetime (assigned a 1-byte internal ID).
span
Span | null
The span to tag. Defaults to the currently active span.
// Track a business transaction through the pipeline
tracer.trace('process.order', (span) => {
  const orderId = generateOrderId()

  tracer.dataStreamsCheckpointer.trackTransaction(
    orderId,
    'order.received',
    span
  )

  // ... process order ...
})

End-to-end example

const tracer = require('dd-trace').init({ dsmEnabled: true })
const { dataStreamsCheckpointer } = tracer

// Producer service
async function publishOrder(order) {
  return tracer.trace('orders.publish', async (span) => {
    const headers = {}
    dataStreamsCheckpointer.setProduceCheckpoint('kafka', 'orders', headers)

    await kafkaProducer.send({
      topic: 'orders',
      messages: [{ value: JSON.stringify(order), headers }],
    })
  })
}

// Consumer service
kafkaConsumer.run({
  eachMessage: async ({ topic, message }) => {
    tracer.trace('orders.consume', async () => {
      dataStreamsCheckpointer.setConsumeCheckpoint('kafka', topic, message.headers)

      const order = JSON.parse(message.value.toString())
      await processOrder(order)
    })
  },
})

Build docs developers (and LLMs) love