Skip to main content
MQTT telemetry enables real-time monitoring and logging of your Donkeycar’s performance metrics over the network. Stream sensor data, control outputs, and diagnostic logs to a remote dashboard or monitoring system.

Overview

MQTT telemetry provides:
  • Real-time monitoring - Stream metrics to remote dashboards
  • Remote logging - Capture Python logs over the network
  • Data streaming - Push sensor readings to IoT platforms
  • Performance analysis - Monitor throttle, steering, and velocity
  • Flexible topics - Publish to custom MQTT topics

Architecture

The telemetry system uses MQTT protocol to publish data:
Donkeycar → MQTT Client → MQTT Broker → Subscribers

                            Dashboard/Database

Configuration

Enable MQTT telemetry in myconfig.py:
# Enable telemetry
HAVE_MQTT_TELEMETRY = True

# Robot identification
TELEMETRY_DONKEY_NAME = 'my_robot_01'

# MQTT broker settings
TELEMETRY_MQTT_BROKER_HOST = 'broker.hivemq.com'
TELEMETRY_MQTT_BROKER_PORT = 1883

# Topic template
TELEMETRY_MQTT_TOPIC_TEMPLATE = 'donkey/%s/telemetry'
# Expands to: donkey/my_robot_01/telemetry

# Publishing settings
TELEMETRY_PUBLISH_PERIOD = 1  # seconds between publishes

# Data format
TELEMETRY_MQTT_JSON_ENABLE = False  # True for JSON, False for individual topics

# Default metrics to publish
TELEMETRY_DEFAULT_INPUTS = 'pilot/angle,pilot/throttle,recording'
TELEMETRY_DEFAULT_TYPES = 'float,float,bool'

# Logging integration
TELEMETRY_LOGGING_ENABLE = True
TELEMETRY_LOGGING_LEVEL = 'INFO'
TELEMETRY_LOGGING_FORMAT = '%(message)s'
From donkeycar/templates/cfg_path_follow.py:481-494.

Environment Variables

Override settings with environment variables:
export DONKEY_NAME="my_robot_01"
export DONKEY_MQTT_BROKER="mqtt.example.com"

python manage.py drive

MqttTelemetry Part

Basic Usage

from donkeycar.parts.telemetry import MqttTelemetry

# Create telemetry part
tel = MqttTelemetry(cfg)

# Add to vehicle
V.add(tel,
      inputs=['pilot/angle', 'pilot/throttle', 'recording'],
      outputs=['telem/queue_size'],
      threaded=True)
From donkeycar/templates/path_follow.py:102-103.

Implementation

The telemetry part from donkeycar/parts/telemetry.py:24-51:
class MqttTelemetry(StreamHandler):
    def __init__(self, cfg):
        StreamHandler.__init__(self)

        self.PUBLISH_PERIOD = cfg.TELEMETRY_PUBLISH_PERIOD
        self._last_publish = time.time()
        self._telem_q = queue.Queue()
        self._step_inputs = cfg.TELEMETRY_DEFAULT_INPUTS.split(',')
        self._step_types = cfg.TELEMETRY_DEFAULT_TYPES.split(',')
        self._donkey_name = os.environ.get('DONKEY_NAME', 
                                           cfg.TELEMETRY_DONKEY_NAME)
        self._mqtt_broker = os.environ.get('DONKEY_MQTT_BROKER',
                                           cfg.TELEMETRY_MQTT_BROKER_HOST)
        self._topic = cfg.TELEMETRY_MQTT_TOPIC_TEMPLATE % self._donkey_name
        
        # Connect to MQTT broker
        self._mqtt_client = MQTTClient(callback_api_version=CallbackAPIVersion.VERSION2)
        self._mqtt_client.connect(self._mqtt_broker, cfg.TELEMETRY_MQTT_BROKER_PORT)
        self._mqtt_client.loop_start()

Publishing Formats

Individual Topics (Default)

Each metric publishes to its own topic:
TELEMETRY_MQTT_JSON_ENABLE = False
Topics:
donkey/my_robot_01/telemetry/pilot/angle
donkey/my_robot_01/telemetry/pilot/throttle
donkey/my_robot_01/telemetry/recording
donkey/my_robot_01/telemetry/log/default

JSON Format

Publish all metrics in a single JSON message:
TELEMETRY_MQTT_JSON_ENABLE = True
Payload:
[
  {
    "ts": 1704067200,
    "values": {
      "pilot/angle": 0.15,
      "pilot/throttle": 0.5,
      "recording": true
    }
  }
]
From donkeycar/parts/telemetry.py:109-111:
if self._use_json_format:
    packet = [{'ts': k, 'values': v} for k, v in packet.items()]
    payload = json.dumps(packet)

Custom Metrics

Add custom metrics to telemetry:
# In your vehicle script
if cfg.HAVE_MQTT_TELEMETRY:
    from donkeycar.parts.telemetry import MqttTelemetry
    tel = MqttTelemetry(cfg)
    
    # Add custom inputs
    custom_inputs = ['pos/x', 'pos/y', 'vel/linear', 'vel/angular']
    custom_types = ['float', 'float', 'float', 'float']
    
    tel.add_step_inputs(custom_inputs, custom_types)
    
    V.add(tel,
          inputs=cfg.TELEMETRY_DEFAULT_INPUTS.split(',') + custom_inputs,
          threaded=True)
From donkeycar/parts/telemetry.py:53-61:
def add_step_inputs(self, inputs, types):
    # Add inputs if supported and not yet registered
    for ind in range(0, len(inputs or [])):
        if types[ind] in ['float', 'str', 'int'] and inputs[ind] not in self._step_inputs:
            self._step_inputs.append(inputs[ind])
            self._step_types.append(types[ind])
            
    return self._step_inputs, self._step_types

Logging Integration

Publish Python logs via MQTT:
import logging

logger = logging.getLogger(__name__)

# Logs automatically publish to MQTT
logger.info("Starting autopilot mode")
logger.warning("Low battery detected")
logger.error("Sensor read timeout")
Logs publish to:
donkey/my_robot_01/telemetry/log/default
From donkeycar/parts/telemetry.py:48-51:
if cfg.TELEMETRY_LOGGING_ENABLE:
    self.setLevel(logging.getLevelName(cfg.TELEMETRY_LOGGING_LEVEL))
    self.setFormatter(logging.Formatter(cfg.TELEMETRY_LOGGING_FORMAT))
    logger.addHandler(self)

Manual Reporting

Publish arbitrary metrics:
from donkeycar.parts.telemetry import MqttTelemetry

tel = MqttTelemetry(cfg)

# Report custom metrics
tel.report({
    'battery/voltage': 12.3,
    'battery/current': 2.1,
    'temperature/cpu': 65.0
})
From donkeycar/parts/telemetry.py:73-85:
def report(self, metrics):
    """Basic reporting - gets arbitrary dictionary with values"""
    curr_time = int(time.time())

    # Store sample with time rounded to second
    try:
        self._telem_q.put((curr_time, metrics), block=False)
    except queue.Full:
        pass

    return curr_time

Threaded Operation

The telemetry part runs in a background thread:
def update(self):
    logger.info(f"Telemetry MQTT server connected (publishing: {', '.join(self._step_inputs)})")
    while self._on:
        self.publish()
        time.sleep(self.PUBLISH_PERIOD)
From donkeycar/parts/telemetry.py:175-179.

MQTT Brokers

Public Brokers

Test with public MQTT brokers:
# HiveMQ (default)
TELEMETRY_MQTT_BROKER_HOST = 'broker.hivemq.com'

# Eclipse  
TELEMETRY_MQTT_BROKER_HOST = 'mqtt.eclipseprojects.io'

# Mosquitto test server
TELEMETRY_MQTT_BROKER_HOST = 'test.mosquitto.org'

Local Broker

Install Mosquitto locally:
# Ubuntu/Debian
sudo apt-get install mosquitto mosquitto-clients

# Start broker
sudo systemctl start mosquitto

# Test
mosquitto_sub -h localhost -t 'donkey/#' -v
# Connect to local broker
TELEMETRY_MQTT_BROKER_HOST = 'localhost'
TELEMETRY_MQTT_BROKER_PORT = 1883

Cloud Platforms

Connect to IoT platforms:
# AWS IoT Core
TELEMETRY_MQTT_BROKER_HOST = 'xxxxxx.iot.us-east-1.amazonaws.com'
TELEMETRY_MQTT_BROKER_PORT = 8883  # TLS

# Azure IoT Hub
TELEMETRY_MQTT_BROKER_HOST = 'xxx.azure-devices.net'

# Google Cloud IoT Core
TELEMETRY_MQTT_BROKER_HOST = 'mqtt.googleapis.com'

Monitoring

Command Line

Subscribe to telemetry topics:
# Subscribe to all topics
mosquitto_sub -h broker.hivemq.com -t 'donkey/my_robot_01/telemetry/#' -v

# Subscribe to specific metric
mosquitto_sub -h broker.hivemq.com -t 'donkey/my_robot_01/telemetry/pilot/throttle'

Python Subscriber

import paho.mqtt.client as mqtt

def on_message(client, userdata, message):
    print(f"{message.topic}: {message.payload.decode()}")

client = mqtt.Client()
client.on_message = on_message
client.connect('broker.hivemq.com', 1883)
client.subscribe('donkey/my_robot_01/telemetry/#')
client.loop_forever()

Web Dashboard

Use MQTT dashboards:
  • Node-RED - Visual flow-based dashboard
  • Grafana - Time-series visualization
  • Home Assistant - IoT dashboard
  • HiveMQ Web Client - Browser-based monitoring

Data Types

Supported telemetry types:
TELEMETRY_DEFAULT_TYPES = 'float,str,int'
From donkeycar/parts/telemetry.py:64-71:
@staticmethod
def filter_supported_metrics(inputs, types):
    supported_inputs = []
    supported_types = []
    for ind in range(0, len(inputs or [])):
        if types[ind] in ['float', 'str', 'int']:
            supported_inputs.append(inputs[ind])
            supported_types.append(types[ind])
    return supported_inputs, supported_types
Complex types (arrays, images) are not supported.

Performance

Monitor telemetry queue:
# Queue size output
V.add(tel,
      inputs=['pilot/angle', 'pilot/throttle'],
      outputs=['telem/queue_size'])

# Log queue size
class QueueMonitor:
    def run(self, queue_size):
        if queue_size > 100:
            print(f"Warning: telemetry queue backed up: {queue_size}")

V.add(QueueMonitor(), inputs=['telem/queue_size'])
From donkeycar/parts/telemetry.py:95-96:
@property
def qsize(self):
    return self._telem_q.qsize()

Best Practices

  1. Use local broker - Reduce latency and network dependencies
  2. Limit publish rate - Balance resolution with bandwidth
  3. Monitor queue size - Detect slow network or broker issues
  4. Filter metrics - Only publish necessary data
  5. Use JSON for dashboards - Easier to parse in web UIs

Troubleshooting

Connection Failed

# Check broker reachability
ping broker.hivemq.com

# Test with mosquitto client
mosquitto_pub -h broker.hivemq.com -t 'test' -m 'hello'

No Data Published

# Verify telemetry is enabled
HAVE_MQTT_TELEMETRY = True

# Check inputs match vehicle outputs
TELEMETRY_DEFAULT_INPUTS = 'pilot/angle,pilot/throttle'

# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)

High Queue Size

  • Increase TELEMETRY_PUBLISH_PERIOD
  • Reduce number of metrics
  • Check network bandwidth
  • Use local MQTT broker

Next Steps

Build docs developers (and LLMs) love