The Part Interface
A Donkeycar part is a Python class that implements a simple interface:class MyPart:
def run(self, input1, input2):
"""Called every loop iteration, returns outputs"""
output1 = self.process(input1, input2)
return output1
def shutdown(self):
"""Cleanup when vehicle stops"""
pass
Method Reference
run(*args)- Synchronous execution in main looprun_threaded(*args)- Returns cached values from background threadupdate()- Runs continuously in background threadshutdown()- Cleanup (close files, stop threads, release hardware)
Basic Synchronous Part
Simple parts that execute in the main vehicle loop.Example: Throttle Limiter
class ThrottleLimiter:
"""Limits maximum throttle based on steering angle"""
def __init__(self, max_throttle=1.0, steering_threshold=0.5):
self.max_throttle = max_throttle
self.steering_threshold = steering_threshold
def run(self, angle, throttle):
"""Reduce throttle when turning sharply"""
if abs(angle) > self.steering_threshold:
# Reduce throttle proportional to steering angle
reduction = abs(angle) / 1.0 # Normalize angle
limited_throttle = throttle * (1.0 - reduction * 0.5)
else:
limited_throttle = throttle
# Enforce maximum
limited_throttle = min(limited_throttle, self.max_throttle)
return limited_throttle
def shutdown(self):
pass
# Usage in manage.py
limiter = ThrottleLimiter(max_throttle=0.8, steering_threshold=0.3)
V.add(limiter,
inputs=['pilot/angle', 'pilot/throttle'],
outputs=['pilot/throttle'])
Example: Data Logger
import csv
import datetime
class CSVLogger:
"""Logs data to CSV file"""
def __init__(self, filename=None):
if filename is None:
filename = f"log_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
self.filename = filename
self.file = open(filename, 'w', newline='')
self.writer = csv.writer(self.file)
self.writer.writerow(['timestamp', 'angle', 'throttle', 'mode'])
def run(self, angle, throttle, mode):
"""Log data to CSV"""
timestamp = datetime.datetime.now().isoformat()
self.writer.writerow([timestamp, angle, throttle, mode])
return angle, throttle, mode # Pass through
def shutdown(self):
self.file.close()
print(f"Log saved to {self.filename}")
# Usage
logger = CSVLogger()
V.add(logger,
inputs=['user/angle', 'user/throttle', 'user/mode'],
outputs=['user/angle', 'user/throttle', 'user/mode'])
Threaded Parts
Parts that run continuously in the background for I/O operations.Example: Distance Sensor
import time
import RPi.GPIO as GPIO
class UltrasonicSensor:
"""HC-SR04 ultrasonic distance sensor"""
def __init__(self, trigger_pin=23, echo_pin=24, poll_delay=0.05):
self.trigger = trigger_pin
self.echo = echo_pin
self.poll_delay = poll_delay
self.distance = 0.0
self.on = True
# Setup GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.trigger, GPIO.OUT)
GPIO.setup(self.echo, GPIO.IN)
def update(self):
"""Background thread measures distance continuously"""
while self.on:
# Trigger pulse
GPIO.output(self.trigger, GPIO.HIGH)
time.sleep(0.00001)
GPIO.output(self.trigger, GPIO.LOW)
# Wait for echo
timeout = time.time() + 0.1 # 100ms timeout
while GPIO.input(self.echo) == GPIO.LOW:
start_time = time.time()
if time.time() > timeout:
break
while GPIO.input(self.echo) == GPIO.HIGH:
end_time = time.time()
if time.time() > timeout:
break
# Calculate distance
try:
duration = end_time - start_time
self.distance = duration * 34300 / 2 # cm
except:
pass # Keep previous value on error
time.sleep(self.poll_delay)
def run_threaded(self):
"""Return cached distance value"""
return self.distance
def shutdown(self):
self.on = False
GPIO.cleanup()
# Usage
sensor = UltrasonicSensor(trigger_pin=23, echo_pin=24)
V.add(sensor, outputs=['obstacle/distance'], threaded=True)
Example: Web API Client
import time
import requests
import json
class WeatherAPI:
"""Fetches weather data in background"""
def __init__(self, api_key, location, poll_interval=300):
self.api_key = api_key
self.location = location
self.poll_interval = poll_interval # 5 minutes
self.temperature = 0.0
self.humidity = 0.0
self.on = True
def update(self):
"""Background thread fetches weather data"""
url = f"https://api.openweathermap.org/data/2.5/weather"
params = {
'q': self.location,
'appid': self.api_key,
'units': 'metric'
}
while self.on:
try:
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
self.temperature = data['main']['temp']
self.humidity = data['main']['humidity']
except Exception as e:
print(f"Weather API error: {e}")
time.sleep(self.poll_interval)
def run_threaded(self):
"""Return cached weather data"""
return self.temperature, self.humidity
def shutdown(self):
self.on = False
# Usage
weather = WeatherAPI(api_key='YOUR_KEY', location='San Francisco')
V.add(weather, outputs=['weather/temp', 'weather/humidity'], threaded=True)
Advanced Patterns
State Machine Part
from enum import Enum
class State(Enum):
IDLE = 0
DRIVING = 1
OBSTACLE_DETECTED = 2
AVOIDING = 3
class ObstacleAvoidance:
"""State machine for obstacle avoidance"""
def __init__(self, detection_distance=30.0):
self.state = State.IDLE
self.detection_distance = detection_distance
self.avoid_angle = 0.5
self.counter = 0
def run(self, angle, throttle, distance):
"""State machine logic"""
if self.state == State.IDLE:
if throttle > 0:
self.state = State.DRIVING
elif self.state == State.DRIVING:
if distance < self.detection_distance:
self.state = State.OBSTACLE_DETECTED
self.counter = 0
# Normal driving
return angle, throttle
elif self.state == State.OBSTACLE_DETECTED:
# Stop
self.state = State.AVOIDING
return 0.0, 0.0
elif self.state == State.AVOIDING:
# Turn away from obstacle
self.counter += 1
if self.counter < 10:
return self.avoid_angle, 0.2
elif distance > self.detection_distance:
self.state = State.DRIVING
return angle, throttle
else:
return self.avoid_angle, 0.0
return angle, throttle
def shutdown(self):
pass
# Usage
avoider = ObstacleAvoidance(detection_distance=30.0)
V.add(avoider,
inputs=['pilot/angle', 'pilot/throttle', 'obstacle/distance'],
outputs=['pilot/angle', 'pilot/throttle'])
Filter Part
from collections import deque
import numpy as np
class MovingAverageFilter:
"""Smooths noisy sensor data"""
def __init__(self, window_size=5):
self.window_size = window_size
self.buffer = deque(maxlen=window_size)
def run(self, value):
"""Apply moving average filter"""
self.buffer.append(value)
return np.mean(self.buffer)
def shutdown(self):
pass
# Usage
filter = MovingAverageFilter(window_size=10)
V.add(filter,
inputs=['imu/accel_x'],
outputs=['imu/accel_x_filtered'])
Conditional Execution
class ConditionalPilot:
"""Only run pilot when conditions are met"""
def __init__(self, pilot, min_distance=50.0):
self.pilot = pilot
self.min_distance = min_distance
def run(self, img_arr, distance, user_angle, user_throttle):
"""Run pilot only if no obstacles"""
if distance > self.min_distance:
# Use pilot
return self.pilot.run(img_arr)
else:
# Use manual control
return user_angle, user_throttle
def shutdown(self):
self.pilot.shutdown()
# Usage
from donkeycar.parts.keras import KerasLinear
pilot = KerasLinear()
pilot.load('./models/my_model.h5')
conditional = ConditionalPilot(pilot, min_distance=50.0)
V.add(conditional,
inputs=['cam/image_array', 'obstacle/distance', 'user/angle', 'user/throttle'],
outputs=['pilot/angle', 'pilot/throttle'])
Multi-Input/Output Parts
Example: Sensor Fusion
import numpy as np
class IMUCameraFusion:
"""Combines camera and IMU for better steering"""
def __init__(self, camera_weight=0.7, imu_weight=0.3):
self.camera_weight = camera_weight
self.imu_weight = imu_weight
def run(self, cam_angle, cam_throttle, gyro_z):
"""Fuse camera prediction with gyroscope"""
# Adjust steering based on angular velocity
imu_correction = -gyro_z * 0.1 # Scale gyro reading
# Weighted combination
fused_angle = (self.camera_weight * cam_angle +
self.imu_weight * imu_correction)
return fused_angle, cam_throttle
def shutdown(self):
pass
# Usage
fusion = IMUCameraFusion(camera_weight=0.8, imu_weight=0.2)
V.add(fusion,
inputs=['pilot/angle', 'pilot/throttle', 'imu/gyro_z'],
outputs=['pilot/angle', 'pilot/throttle'])
Testing Your Part
Unit Test Example
import unittest
class TestThrottleLimiter(unittest.TestCase):
def setUp(self):
self.limiter = ThrottleLimiter(max_throttle=0.8)
def test_straight_no_limit(self):
"""Test no throttle limit when going straight"""
throttle = self.limiter.run(angle=0.0, throttle=0.7)
self.assertEqual(throttle, 0.7)
def test_turn_reduces_throttle(self):
"""Test throttle reduction during turn"""
throttle = self.limiter.run(angle=0.8, throttle=0.7)
self.assertLess(throttle, 0.7)
def test_max_throttle_enforced(self):
"""Test maximum throttle is enforced"""
throttle = self.limiter.run(angle=0.0, throttle=1.0)
self.assertEqual(throttle, 0.8)
if __name__ == '__main__':
unittest.main()
Standalone Testing
# test_sensor.py
from my_parts import UltrasonicSensor
import time
def test_sensor():
sensor = UltrasonicSensor(trigger_pin=23, echo_pin=24)
# Start background thread
import threading
thread = threading.Thread(target=sensor.update)
thread.start()
try:
for i in range(10):
distance = sensor.run_threaded()
print(f"Distance: {distance:.1f} cm")
time.sleep(0.5)
finally:
sensor.shutdown()
thread.join()
if __name__ == '__main__':
test_sensor()
Packaging Your Part
Directory Structure
my_parts/
__init__.py
throttle_limiter.py
obstacle_avoidance.py
sensors/
__init__.py
ultrasonic.py
README.md
setup.py
setup.py
from setuptools import setup, find_packages
setup(
name='donkeycar-my-parts',
version='0.1.0',
packages=find_packages(),
install_requires=[
'donkeycar>=4.3.0',
'RPi.GPIO>=0.7.0',
],
author='Your Name',
author_email='[email protected]',
description='Custom Donkeycar parts',
url='https://github.com/yourusername/donkeycar-my-parts',
)
Installation
# Development install
pip install -e ./my_parts
# Or from GitHub
pip install git+https://github.com/yourusername/donkeycar-my-parts.git
Best Practices
1. Error Handling
import logging
logger = logging.getLogger(__name__)
class RobustPart:
def run(self, value):
try:
result = self.process(value)
return result
except Exception as e:
logger.error(f"Error in RobustPart: {e}")
return 0.0 # Safe default
def shutdown(self):
try:
self.cleanup()
except Exception as e:
logger.error(f"Error during shutdown: {e}")
2. Configuration
class ConfigurablePart:
def __init__(self, cfg):
self.threshold = cfg.THRESHOLD
self.enabled = cfg.ENABLE_FEATURE
def run(self, value):
if not self.enabled:
return value # Pass through
if value > self.threshold:
return self.process(value)
return value
myconfig.py:
THRESHOLD = 0.5
ENABLE_FEATURE = True
3. Documentation
class WellDocumentedPart:
"""
A well-documented part that does X.
This part takes inputs A and B, processes them according to algorithm Y,
and outputs C. It is useful for Z use case.
Example:
>>> part = WellDocumentedPart(param=1.0)
>>> output = part.run(input1=0.5, input2=0.3)
Attributes:
param (float): Description of parameter
state (int): Current state of the part
"""
def __init__(self, param=1.0):
"""
Initialize the part.
Args:
param (float): Parameter description and valid range
Raises:
ValueError: If param is out of range
"""
if param < 0 or param > 1:
raise ValueError("param must be between 0 and 1")
self.param = param
self.state = 0
def run(self, input1, input2):
"""
Process inputs and return output.
Args:
input1 (float): First input in range [-1, 1]
input2 (float): Second input in range [0, 1]
Returns:
float: Processed output in range [0, 1]
"""
output = (input1 + input2) * self.param
return max(0, min(1, output))
def shutdown(self):
"""Release resources and cleanup."""
pass
4. Type Hints
from typing import Tuple
import numpy as np
class TypedPart:
def __init__(self, threshold: float = 0.5) -> None:
self.threshold = threshold
def run(self, img_arr: np.ndarray, angle: float) -> Tuple[float, bool]:
"""Process image and return modified angle and flag."""
processed = self.process_image(img_arr)
flag = processed > self.threshold
return angle * 0.9, flag
def process_image(self, img: np.ndarray) -> float:
return float(np.mean(img))
Common Patterns
Lazy Initialization
class LazyPart:
def __init__(self):
self._model = None
@property
def model(self):
"""Lazy load heavy resource"""
if self._model is None:
self._model = self.load_model()
return self._model
def load_model(self):
# Expensive initialization
import tensorflow as tf
return tf.keras.models.load_model('./model.h5')
Caching Results
import functools
import time
class CachedPart:
def __init__(self, cache_duration=1.0):
self.cache_duration = cache_duration
self.cache = {}
def run(self, value):
now = time.time()
# Check cache
if 'result' in self.cache:
cached_time, cached_result = self.cache['result']
if now - cached_time < self.cache_duration:
return cached_result
# Compute
result = self.expensive_computation(value)
# Cache
self.cache['result'] = (now, result)
return result
Debugging
Add Logging
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class DebugPart:
def run(self, value):
logger.debug(f"Input: {value}")
result = self.process(value)
logger.debug(f"Output: {result}")
return result
Print State
class DebuggablePart:
def __init__(self, debug=False):
self.debug = debug
def run(self, value):
if self.debug:
print(f"[DEBUG] Input: {value}, State: {self.__dict__}")
return self.process(value)
Next Steps
- Study existing parts in
donkeycar/parts/ - Share your parts with the community
- Contribute to Donkeycar on GitHub
- Join the Donkeycar Discord for help
