The Data Streams Checkpointer is available as tracer.dataStreamsCheckpointer. It lets you manually set checkpoints for message producers and consumers when automatic instrumentation is not available or insufficient.
const tracer = require('dd-trace').init({
dsmEnabled: true, // or DD_DATA_STREAMS_ENABLED=true
})
const { dataStreamsCheckpointer } = tracer
Data Streams Monitoring must be enabled via dsmEnabled: true in TracerOptions or DD_DATA_STREAMS_ENABLED=true for checkpoints to have any effect.
Methods
setProduceCheckpoint(type, target, carrier)
Sets a produce checkpoint and injects the DSM propagation context into the provided carrier object.
setProduceCheckpoint(type: string, target: string, carrier: any): void
The streaming technology (e.g., kafka, kinesis, sns, sqs, rabbitmq).
The target of the data: the topic name, exchange name, or stream name.
The carrier object to inject the DSM context into. This is typically the message headers object.
// Kafka producer example
async function produceMessage(producer, topic, message) {
const headers = {}
tracer.dataStreamsCheckpointer.setProduceCheckpoint('kafka', topic, headers)
await producer.send({
topic,
messages: [{
value: JSON.stringify(message),
headers,
}],
})
}
// SNS producer example
async function publishToSNS(snsClient, topicArn, message) {
const carrier = {}
tracer.dataStreamsCheckpointer.setProduceCheckpoint('sns', topicArn, carrier)
await snsClient.publish({
TopicArn: topicArn,
Message: JSON.stringify(message),
MessageAttributes: carrier,
})
}
setConsumeCheckpoint(type, source, carrier, manualCheckpoint?)
Sets a consume checkpoint and extracts the DSM propagation context from the provided carrier.
setConsumeCheckpoint(type: string, source: string, carrier: any, manualCheckpoint?: boolean): any
The streaming technology (e.g., kafka, kinesis, sns, sqs).
The source of the data: the topic name, queue name, or stream name.
The carrier object to extract the DSM context from. This is typically the message headers object.
Whether this checkpoint was set manually. Defaults to true.
The DSM context associated with the current pathway.
// Kafka consumer example
async function processMessages(consumer, topic) {
await consumer.run({
eachMessage: async ({ topic, message }) => {
tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
'kafka',
topic,
message.headers
)
await handleMessage(message.value)
},
})
}
// SQS consumer example
async function processSQSMessages(sqsClient, queueUrl) {
const response = await sqsClient.receiveMessage({ QueueUrl: queueUrl })
for (const message of response.Messages ?? []) {
const carrier = JSON.parse(message.Body)
tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
'sqs',
queueUrl,
carrier
)
await processMessage(message)
}
}
trackTransaction(transactionId, checkpointName, span?)
Records a transaction ID at a named checkpoint without pathway propagation. Tags the active span (or the provided span) with dsm.transaction.id and dsm.transaction.checkpoint.
trackTransaction(transactionId: string, checkpointName: string, span?: Span | null): void
A unique transaction identifier (truncated to 255 UTF-8 bytes).
A logical checkpoint name. Should be stable and unique per process lifetime (assigned a 1-byte internal ID).
The span to tag. Defaults to the currently active span.
// Track a business transaction through the pipeline
tracer.trace('process.order', (span) => {
const orderId = generateOrderId()
tracer.dataStreamsCheckpointer.trackTransaction(
orderId,
'order.received',
span
)
// ... process order ...
})
End-to-end example
const tracer = require('dd-trace').init({ dsmEnabled: true })
const { dataStreamsCheckpointer } = tracer
// Producer service
async function publishOrder(order) {
return tracer.trace('orders.publish', async (span) => {
const headers = {}
dataStreamsCheckpointer.setProduceCheckpoint('kafka', 'orders', headers)
await kafkaProducer.send({
topic: 'orders',
messages: [{ value: JSON.stringify(order), headers }],
})
})
}
// Consumer service
kafkaConsumer.run({
eachMessage: async ({ topic, message }) => {
tracer.trace('orders.consume', async () => {
dataStreamsCheckpointer.setConsumeCheckpoint('kafka', topic, message.headers)
const order = JSON.parse(message.value.toString())
await processOrder(order)
})
},
})