Documentation Index
Fetch the complete documentation index at: https://mintlify.com/autorope/donkeycar/llms.txt
Use this file to discover all available pages before exploring further.
MQTT telemetry enables real-time monitoring and logging of your Donkeycar’s performance metrics over the network. Stream sensor data, control outputs, and diagnostic logs to a remote dashboard or monitoring system.
Overview
MQTT telemetry provides:
- Real-time monitoring - Stream metrics to remote dashboards
- Remote logging - Capture Python logs over the network
- Data streaming - Push sensor readings to IoT platforms
- Performance analysis - Monitor throttle, steering, and velocity
- Flexible topics - Publish to custom MQTT topics
Architecture
The telemetry system uses MQTT protocol to publish data:
Donkeycar → MQTT Client → MQTT Broker → Subscribers
↓
Dashboard/Database
Configuration
Enable MQTT telemetry in myconfig.py:
# Enable telemetry
HAVE_MQTT_TELEMETRY = True
# Robot identification
TELEMETRY_DONKEY_NAME = 'my_robot_01'
# MQTT broker settings
TELEMETRY_MQTT_BROKER_HOST = 'broker.hivemq.com'
TELEMETRY_MQTT_BROKER_PORT = 1883
# Topic template
TELEMETRY_MQTT_TOPIC_TEMPLATE = 'donkey/%s/telemetry'
# Expands to: donkey/my_robot_01/telemetry
# Publishing settings
TELEMETRY_PUBLISH_PERIOD = 1 # seconds between publishes
# Data format
TELEMETRY_MQTT_JSON_ENABLE = False # True for JSON, False for individual topics
# Default metrics to publish
TELEMETRY_DEFAULT_INPUTS = 'pilot/angle,pilot/throttle,recording'
TELEMETRY_DEFAULT_TYPES = 'float,float,bool'
# Logging integration
TELEMETRY_LOGGING_ENABLE = True
TELEMETRY_LOGGING_LEVEL = 'INFO'
TELEMETRY_LOGGING_FORMAT = '%(message)s'
From donkeycar/templates/cfg_path_follow.py:481-494.
Environment Variables
Override settings with environment variables:
export DONKEY_NAME="my_robot_01"
export DONKEY_MQTT_BROKER="mqtt.example.com"
python manage.py drive
MqttTelemetry Part
Basic Usage
from donkeycar.parts.telemetry import MqttTelemetry
# Create telemetry part
tel = MqttTelemetry(cfg)
# Add to vehicle
V.add(tel,
inputs=['pilot/angle', 'pilot/throttle', 'recording'],
outputs=['telem/queue_size'],
threaded=True)
From donkeycar/templates/path_follow.py:102-103.
Implementation
The telemetry part from donkeycar/parts/telemetry.py:24-51:
class MqttTelemetry(StreamHandler):
def __init__(self, cfg):
StreamHandler.__init__(self)
self.PUBLISH_PERIOD = cfg.TELEMETRY_PUBLISH_PERIOD
self._last_publish = time.time()
self._telem_q = queue.Queue()
self._step_inputs = cfg.TELEMETRY_DEFAULT_INPUTS.split(',')
self._step_types = cfg.TELEMETRY_DEFAULT_TYPES.split(',')
self._donkey_name = os.environ.get('DONKEY_NAME',
cfg.TELEMETRY_DONKEY_NAME)
self._mqtt_broker = os.environ.get('DONKEY_MQTT_BROKER',
cfg.TELEMETRY_MQTT_BROKER_HOST)
self._topic = cfg.TELEMETRY_MQTT_TOPIC_TEMPLATE % self._donkey_name
# Connect to MQTT broker
self._mqtt_client = MQTTClient(callback_api_version=CallbackAPIVersion.VERSION2)
self._mqtt_client.connect(self._mqtt_broker, cfg.TELEMETRY_MQTT_BROKER_PORT)
self._mqtt_client.loop_start()
Individual Topics (Default)
Each metric publishes to its own topic:
TELEMETRY_MQTT_JSON_ENABLE = False
Topics:
donkey/my_robot_01/telemetry/pilot/angle
donkey/my_robot_01/telemetry/pilot/throttle
donkey/my_robot_01/telemetry/recording
donkey/my_robot_01/telemetry/log/default
Publish all metrics in a single JSON message:
TELEMETRY_MQTT_JSON_ENABLE = True
Payload:
[
{
"ts": 1704067200,
"values": {
"pilot/angle": 0.15,
"pilot/throttle": 0.5,
"recording": true
}
}
]
From donkeycar/parts/telemetry.py:109-111:
if self._use_json_format:
packet = [{'ts': k, 'values': v} for k, v in packet.items()]
payload = json.dumps(packet)
Custom Metrics
Add custom metrics to telemetry:
# In your vehicle script
if cfg.HAVE_MQTT_TELEMETRY:
from donkeycar.parts.telemetry import MqttTelemetry
tel = MqttTelemetry(cfg)
# Add custom inputs
custom_inputs = ['pos/x', 'pos/y', 'vel/linear', 'vel/angular']
custom_types = ['float', 'float', 'float', 'float']
tel.add_step_inputs(custom_inputs, custom_types)
V.add(tel,
inputs=cfg.TELEMETRY_DEFAULT_INPUTS.split(',') + custom_inputs,
threaded=True)
From donkeycar/parts/telemetry.py:53-61:
def add_step_inputs(self, inputs, types):
# Add inputs if supported and not yet registered
for ind in range(0, len(inputs or [])):
if types[ind] in ['float', 'str', 'int'] and inputs[ind] not in self._step_inputs:
self._step_inputs.append(inputs[ind])
self._step_types.append(types[ind])
return self._step_inputs, self._step_types
Logging Integration
Publish Python logs via MQTT:
import logging
logger = logging.getLogger(__name__)
# Logs automatically publish to MQTT
logger.info("Starting autopilot mode")
logger.warning("Low battery detected")
logger.error("Sensor read timeout")
Logs publish to:
donkey/my_robot_01/telemetry/log/default
From donkeycar/parts/telemetry.py:48-51:
if cfg.TELEMETRY_LOGGING_ENABLE:
self.setLevel(logging.getLevelName(cfg.TELEMETRY_LOGGING_LEVEL))
self.setFormatter(logging.Formatter(cfg.TELEMETRY_LOGGING_FORMAT))
logger.addHandler(self)
Manual Reporting
Publish arbitrary metrics:
from donkeycar.parts.telemetry import MqttTelemetry
tel = MqttTelemetry(cfg)
# Report custom metrics
tel.report({
'battery/voltage': 12.3,
'battery/current': 2.1,
'temperature/cpu': 65.0
})
From donkeycar/parts/telemetry.py:73-85:
def report(self, metrics):
"""Basic reporting - gets arbitrary dictionary with values"""
curr_time = int(time.time())
# Store sample with time rounded to second
try:
self._telem_q.put((curr_time, metrics), block=False)
except queue.Full:
pass
return curr_time
Threaded Operation
The telemetry part runs in a background thread:
def update(self):
logger.info(f"Telemetry MQTT server connected (publishing: {', '.join(self._step_inputs)})")
while self._on:
self.publish()
time.sleep(self.PUBLISH_PERIOD)
From donkeycar/parts/telemetry.py:175-179.
MQTT Brokers
Public Brokers
Test with public MQTT brokers:
# HiveMQ (default)
TELEMETRY_MQTT_BROKER_HOST = 'broker.hivemq.com'
# Eclipse
TELEMETRY_MQTT_BROKER_HOST = 'mqtt.eclipseprojects.io'
# Mosquitto test server
TELEMETRY_MQTT_BROKER_HOST = 'test.mosquitto.org'
Local Broker
Install Mosquitto locally:
# Ubuntu/Debian
sudo apt-get install mosquitto mosquitto-clients
# Start broker
sudo systemctl start mosquitto
# Test
mosquitto_sub -h localhost -t 'donkey/#' -v
# Connect to local broker
TELEMETRY_MQTT_BROKER_HOST = 'localhost'
TELEMETRY_MQTT_BROKER_PORT = 1883
Connect to IoT platforms:
# AWS IoT Core
TELEMETRY_MQTT_BROKER_HOST = 'xxxxxx.iot.us-east-1.amazonaws.com'
TELEMETRY_MQTT_BROKER_PORT = 8883 # TLS
# Azure IoT Hub
TELEMETRY_MQTT_BROKER_HOST = 'xxx.azure-devices.net'
# Google Cloud IoT Core
TELEMETRY_MQTT_BROKER_HOST = 'mqtt.googleapis.com'
Monitoring
Command Line
Subscribe to telemetry topics:
# Subscribe to all topics
mosquitto_sub -h broker.hivemq.com -t 'donkey/my_robot_01/telemetry/#' -v
# Subscribe to specific metric
mosquitto_sub -h broker.hivemq.com -t 'donkey/my_robot_01/telemetry/pilot/throttle'
Python Subscriber
import paho.mqtt.client as mqtt
def on_message(client, userdata, message):
print(f"{message.topic}: {message.payload.decode()}")
client = mqtt.Client()
client.on_message = on_message
client.connect('broker.hivemq.com', 1883)
client.subscribe('donkey/my_robot_01/telemetry/#')
client.loop_forever()
Web Dashboard
Use MQTT dashboards:
- Node-RED - Visual flow-based dashboard
- Grafana - Time-series visualization
- Home Assistant - IoT dashboard
- HiveMQ Web Client - Browser-based monitoring
Data Types
Supported telemetry types:
TELEMETRY_DEFAULT_TYPES = 'float,str,int'
From donkeycar/parts/telemetry.py:64-71:
@staticmethod
def filter_supported_metrics(inputs, types):
supported_inputs = []
supported_types = []
for ind in range(0, len(inputs or [])):
if types[ind] in ['float', 'str', 'int']:
supported_inputs.append(inputs[ind])
supported_types.append(types[ind])
return supported_inputs, supported_types
Complex types (arrays, images) are not supported.
Monitor telemetry queue:
# Queue size output
V.add(tel,
inputs=['pilot/angle', 'pilot/throttle'],
outputs=['telem/queue_size'])
# Log queue size
class QueueMonitor:
def run(self, queue_size):
if queue_size > 100:
print(f"Warning: telemetry queue backed up: {queue_size}")
V.add(QueueMonitor(), inputs=['telem/queue_size'])
From donkeycar/parts/telemetry.py:95-96:
@property
def qsize(self):
return self._telem_q.qsize()
Best Practices
- Use local broker - Reduce latency and network dependencies
- Limit publish rate - Balance resolution with bandwidth
- Monitor queue size - Detect slow network or broker issues
- Filter metrics - Only publish necessary data
- Use JSON for dashboards - Easier to parse in web UIs
Troubleshooting
Connection Failed
# Check broker reachability
ping broker.hivemq.com
# Test with mosquitto client
mosquitto_pub -h broker.hivemq.com -t 'test' -m 'hello'
No Data Published
# Verify telemetry is enabled
HAVE_MQTT_TELEMETRY = True
# Check inputs match vehicle outputs
TELEMETRY_DEFAULT_INPUTS = 'pilot/angle,pilot/throttle'
# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)
High Queue Size
- Increase
TELEMETRY_PUBLISH_PERIOD
- Reduce number of metrics
- Check network bandwidth
- Use local MQTT broker
Next Steps