Skip to main content
Learn how to create custom parts to extend Donkeycar with new sensors, algorithms, and behaviors.

The Part Interface

A Donkeycar part is a Python class that implements a simple interface:
class MyPart:
    def run(self, input1, input2):
        """Called every loop iteration, returns outputs"""
        output1 = self.process(input1, input2)
        return output1
    
    def shutdown(self):
        """Cleanup when vehicle stops"""
        pass

Method Reference

  • run(*args) - Synchronous execution in main loop
  • run_threaded(*args) - Returns cached values from background thread
  • update() - Runs continuously in background thread
  • shutdown() - Cleanup (close files, stop threads, release hardware)

Basic Synchronous Part

Simple parts that execute in the main vehicle loop.

Example: Throttle Limiter

class ThrottleLimiter:
    """Limits maximum throttle based on steering angle"""
    
    def __init__(self, max_throttle=1.0, steering_threshold=0.5):
        self.max_throttle = max_throttle
        self.steering_threshold = steering_threshold
    
    def run(self, angle, throttle):
        """Reduce throttle when turning sharply"""
        if abs(angle) > self.steering_threshold:
            # Reduce throttle proportional to steering angle
            reduction = abs(angle) / 1.0  # Normalize angle
            limited_throttle = throttle * (1.0 - reduction * 0.5)
        else:
            limited_throttle = throttle
        
        # Enforce maximum
        limited_throttle = min(limited_throttle, self.max_throttle)
        
        return limited_throttle
    
    def shutdown(self):
        pass

# Usage in manage.py
limiter = ThrottleLimiter(max_throttle=0.8, steering_threshold=0.3)
V.add(limiter,
      inputs=['pilot/angle', 'pilot/throttle'],
      outputs=['pilot/throttle'])

Example: Data Logger

import csv
import datetime

class CSVLogger:
    """Logs data to CSV file"""
    
    def __init__(self, filename=None):
        if filename is None:
            filename = f"log_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        self.filename = filename
        self.file = open(filename, 'w', newline='')
        self.writer = csv.writer(self.file)
        self.writer.writerow(['timestamp', 'angle', 'throttle', 'mode'])
    
    def run(self, angle, throttle, mode):
        """Log data to CSV"""
        timestamp = datetime.datetime.now().isoformat()
        self.writer.writerow([timestamp, angle, throttle, mode])
        return angle, throttle, mode  # Pass through
    
    def shutdown(self):
        self.file.close()
        print(f"Log saved to {self.filename}")

# Usage
logger = CSVLogger()
V.add(logger,
      inputs=['user/angle', 'user/throttle', 'user/mode'],
      outputs=['user/angle', 'user/throttle', 'user/mode'])

Threaded Parts

Parts that run continuously in the background for I/O operations.

Example: Distance Sensor

import time
import RPi.GPIO as GPIO

class UltrasonicSensor:
    """HC-SR04 ultrasonic distance sensor"""
    
    def __init__(self, trigger_pin=23, echo_pin=24, poll_delay=0.05):
        self.trigger = trigger_pin
        self.echo = echo_pin
        self.poll_delay = poll_delay
        self.distance = 0.0
        self.on = True
        
        # Setup GPIO
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.trigger, GPIO.OUT)
        GPIO.setup(self.echo, GPIO.IN)
    
    def update(self):
        """Background thread measures distance continuously"""
        while self.on:
            # Trigger pulse
            GPIO.output(self.trigger, GPIO.HIGH)
            time.sleep(0.00001)
            GPIO.output(self.trigger, GPIO.LOW)
            
            # Wait for echo
            timeout = time.time() + 0.1  # 100ms timeout
            
            while GPIO.input(self.echo) == GPIO.LOW:
                start_time = time.time()
                if time.time() > timeout:
                    break
            
            while GPIO.input(self.echo) == GPIO.HIGH:
                end_time = time.time()
                if time.time() > timeout:
                    break
            
            # Calculate distance
            try:
                duration = end_time - start_time
                self.distance = duration * 34300 / 2  # cm
            except:
                pass  # Keep previous value on error
            
            time.sleep(self.poll_delay)
    
    def run_threaded(self):
        """Return cached distance value"""
        return self.distance
    
    def shutdown(self):
        self.on = False
        GPIO.cleanup()

# Usage
sensor = UltrasonicSensor(trigger_pin=23, echo_pin=24)
V.add(sensor, outputs=['obstacle/distance'], threaded=True)

Example: Web API Client

import time
import requests
import json

class WeatherAPI:
    """Fetches weather data in background"""
    
    def __init__(self, api_key, location, poll_interval=300):
        self.api_key = api_key
        self.location = location
        self.poll_interval = poll_interval  # 5 minutes
        self.temperature = 0.0
        self.humidity = 0.0
        self.on = True
    
    def update(self):
        """Background thread fetches weather data"""
        url = f"https://api.openweathermap.org/data/2.5/weather"
        params = {
            'q': self.location,
            'appid': self.api_key,
            'units': 'metric'
        }
        
        while self.on:
            try:
                response = requests.get(url, params=params, timeout=10)
                if response.status_code == 200:
                    data = response.json()
                    self.temperature = data['main']['temp']
                    self.humidity = data['main']['humidity']
            except Exception as e:
                print(f"Weather API error: {e}")
            
            time.sleep(self.poll_interval)
    
    def run_threaded(self):
        """Return cached weather data"""
        return self.temperature, self.humidity
    
    def shutdown(self):
        self.on = False

# Usage
weather = WeatherAPI(api_key='YOUR_KEY', location='San Francisco')
V.add(weather, outputs=['weather/temp', 'weather/humidity'], threaded=True)

Advanced Patterns

State Machine Part

from enum import Enum

class State(Enum):
    IDLE = 0
    DRIVING = 1
    OBSTACLE_DETECTED = 2
    AVOIDING = 3

class ObstacleAvoidance:
    """State machine for obstacle avoidance"""
    
    def __init__(self, detection_distance=30.0):
        self.state = State.IDLE
        self.detection_distance = detection_distance
        self.avoid_angle = 0.5
        self.counter = 0
    
    def run(self, angle, throttle, distance):
        """State machine logic"""
        
        if self.state == State.IDLE:
            if throttle > 0:
                self.state = State.DRIVING
        
        elif self.state == State.DRIVING:
            if distance < self.detection_distance:
                self.state = State.OBSTACLE_DETECTED
                self.counter = 0
            # Normal driving
            return angle, throttle
        
        elif self.state == State.OBSTACLE_DETECTED:
            # Stop
            self.state = State.AVOIDING
            return 0.0, 0.0
        
        elif self.state == State.AVOIDING:
            # Turn away from obstacle
            self.counter += 1
            if self.counter < 10:
                return self.avoid_angle, 0.2
            elif distance > self.detection_distance:
                self.state = State.DRIVING
                return angle, throttle
            else:
                return self.avoid_angle, 0.0
        
        return angle, throttle
    
    def shutdown(self):
        pass

# Usage
avoider = ObstacleAvoidance(detection_distance=30.0)
V.add(avoider,
      inputs=['pilot/angle', 'pilot/throttle', 'obstacle/distance'],
      outputs=['pilot/angle', 'pilot/throttle'])

Filter Part

from collections import deque
import numpy as np

class MovingAverageFilter:
    """Smooths noisy sensor data"""
    
    def __init__(self, window_size=5):
        self.window_size = window_size
        self.buffer = deque(maxlen=window_size)
    
    def run(self, value):
        """Apply moving average filter"""
        self.buffer.append(value)
        return np.mean(self.buffer)
    
    def shutdown(self):
        pass

# Usage
filter = MovingAverageFilter(window_size=10)
V.add(filter,
      inputs=['imu/accel_x'],
      outputs=['imu/accel_x_filtered'])

Conditional Execution

class ConditionalPilot:
    """Only run pilot when conditions are met"""
    
    def __init__(self, pilot, min_distance=50.0):
        self.pilot = pilot
        self.min_distance = min_distance
    
    def run(self, img_arr, distance, user_angle, user_throttle):
        """Run pilot only if no obstacles"""
        if distance > self.min_distance:
            # Use pilot
            return self.pilot.run(img_arr)
        else:
            # Use manual control
            return user_angle, user_throttle
    
    def shutdown(self):
        self.pilot.shutdown()

# Usage
from donkeycar.parts.keras import KerasLinear

pilot = KerasLinear()
pilot.load('./models/my_model.h5')

conditional = ConditionalPilot(pilot, min_distance=50.0)
V.add(conditional,
      inputs=['cam/image_array', 'obstacle/distance', 'user/angle', 'user/throttle'],
      outputs=['pilot/angle', 'pilot/throttle'])

Multi-Input/Output Parts

Example: Sensor Fusion

import numpy as np

class IMUCameraFusion:
    """Combines camera and IMU for better steering"""
    
    def __init__(self, camera_weight=0.7, imu_weight=0.3):
        self.camera_weight = camera_weight
        self.imu_weight = imu_weight
    
    def run(self, cam_angle, cam_throttle, gyro_z):
        """Fuse camera prediction with gyroscope"""
        # Adjust steering based on angular velocity
        imu_correction = -gyro_z * 0.1  # Scale gyro reading
        
        # Weighted combination
        fused_angle = (self.camera_weight * cam_angle + 
                       self.imu_weight * imu_correction)
        
        return fused_angle, cam_throttle
    
    def shutdown(self):
        pass

# Usage
fusion = IMUCameraFusion(camera_weight=0.8, imu_weight=0.2)
V.add(fusion,
      inputs=['pilot/angle', 'pilot/throttle', 'imu/gyro_z'],
      outputs=['pilot/angle', 'pilot/throttle'])

Testing Your Part

Unit Test Example

import unittest

class TestThrottleLimiter(unittest.TestCase):
    def setUp(self):
        self.limiter = ThrottleLimiter(max_throttle=0.8)
    
    def test_straight_no_limit(self):
        """Test no throttle limit when going straight"""
        throttle = self.limiter.run(angle=0.0, throttle=0.7)
        self.assertEqual(throttle, 0.7)
    
    def test_turn_reduces_throttle(self):
        """Test throttle reduction during turn"""
        throttle = self.limiter.run(angle=0.8, throttle=0.7)
        self.assertLess(throttle, 0.7)
    
    def test_max_throttle_enforced(self):
        """Test maximum throttle is enforced"""
        throttle = self.limiter.run(angle=0.0, throttle=1.0)
        self.assertEqual(throttle, 0.8)

if __name__ == '__main__':
    unittest.main()

Standalone Testing

# test_sensor.py
from my_parts import UltrasonicSensor
import time

def test_sensor():
    sensor = UltrasonicSensor(trigger_pin=23, echo_pin=24)
    
    # Start background thread
    import threading
    thread = threading.Thread(target=sensor.update)
    thread.start()
    
    try:
        for i in range(10):
            distance = sensor.run_threaded()
            print(f"Distance: {distance:.1f} cm")
            time.sleep(0.5)
    finally:
        sensor.shutdown()
        thread.join()

if __name__ == '__main__':
    test_sensor()

Packaging Your Part

Directory Structure

my_parts/
  __init__.py
  throttle_limiter.py
  obstacle_avoidance.py
  sensors/
    __init__.py
    ultrasonic.py
  README.md
  setup.py

setup.py

from setuptools import setup, find_packages

setup(
    name='donkeycar-my-parts',
    version='0.1.0',
    packages=find_packages(),
    install_requires=[
        'donkeycar>=4.3.0',
        'RPi.GPIO>=0.7.0',
    ],
    author='Your Name',
    author_email='[email protected]',
    description='Custom Donkeycar parts',
    url='https://github.com/yourusername/donkeycar-my-parts',
)

Installation

# Development install
pip install -e ./my_parts

# Or from GitHub
pip install git+https://github.com/yourusername/donkeycar-my-parts.git

Best Practices

1. Error Handling

import logging

logger = logging.getLogger(__name__)

class RobustPart:
    def run(self, value):
        try:
            result = self.process(value)
            return result
        except Exception as e:
            logger.error(f"Error in RobustPart: {e}")
            return 0.0  # Safe default
    
    def shutdown(self):
        try:
            self.cleanup()
        except Exception as e:
            logger.error(f"Error during shutdown: {e}")

2. Configuration

class ConfigurablePart:
    def __init__(self, cfg):
        self.threshold = cfg.THRESHOLD
        self.enabled = cfg.ENABLE_FEATURE
    
    def run(self, value):
        if not self.enabled:
            return value  # Pass through
        
        if value > self.threshold:
            return self.process(value)
        return value
In myconfig.py:
THRESHOLD = 0.5
ENABLE_FEATURE = True

3. Documentation

class WellDocumentedPart:
    """
    A well-documented part that does X.
    
    This part takes inputs A and B, processes them according to algorithm Y,
    and outputs C. It is useful for Z use case.
    
    Example:
        >>> part = WellDocumentedPart(param=1.0)
        >>> output = part.run(input1=0.5, input2=0.3)
    
    Attributes:
        param (float): Description of parameter
        state (int): Current state of the part
    """
    
    def __init__(self, param=1.0):
        """
        Initialize the part.
        
        Args:
            param (float): Parameter description and valid range
        
        Raises:
            ValueError: If param is out of range
        """
        if param < 0 or param > 1:
            raise ValueError("param must be between 0 and 1")
        self.param = param
        self.state = 0
    
    def run(self, input1, input2):
        """
        Process inputs and return output.
        
        Args:
            input1 (float): First input in range [-1, 1]
            input2 (float): Second input in range [0, 1]
        
        Returns:
            float: Processed output in range [0, 1]
        """
        output = (input1 + input2) * self.param
        return max(0, min(1, output))
    
    def shutdown(self):
        """Release resources and cleanup."""
        pass

4. Type Hints

from typing import Tuple
import numpy as np

class TypedPart:
    def __init__(self, threshold: float = 0.5) -> None:
        self.threshold = threshold
    
    def run(self, img_arr: np.ndarray, angle: float) -> Tuple[float, bool]:
        """Process image and return modified angle and flag."""
        processed = self.process_image(img_arr)
        flag = processed > self.threshold
        return angle * 0.9, flag
    
    def process_image(self, img: np.ndarray) -> float:
        return float(np.mean(img))

Common Patterns

Lazy Initialization

class LazyPart:
    def __init__(self):
        self._model = None
    
    @property
    def model(self):
        """Lazy load heavy resource"""
        if self._model is None:
            self._model = self.load_model()
        return self._model
    
    def load_model(self):
        # Expensive initialization
        import tensorflow as tf
        return tf.keras.models.load_model('./model.h5')

Caching Results

import functools
import time

class CachedPart:
    def __init__(self, cache_duration=1.0):
        self.cache_duration = cache_duration
        self.cache = {}
    
    def run(self, value):
        now = time.time()
        
        # Check cache
        if 'result' in self.cache:
            cached_time, cached_result = self.cache['result']
            if now - cached_time < self.cache_duration:
                return cached_result
        
        # Compute
        result = self.expensive_computation(value)
        
        # Cache
        self.cache['result'] = (now, result)
        return result

Debugging

Add Logging

import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

class DebugPart:
    def run(self, value):
        logger.debug(f"Input: {value}")
        result = self.process(value)
        logger.debug(f"Output: {result}")
        return result
class DebuggablePart:
    def __init__(self, debug=False):
        self.debug = debug
    
    def run(self, value):
        if self.debug:
            print(f"[DEBUG] Input: {value}, State: {self.__dict__}")
        return self.process(value)

Next Steps

  • Study existing parts in donkeycar/parts/
  • Share your parts with the community
  • Contribute to Donkeycar on GitHub
  • Join the Donkeycar Discord for help

Build docs developers (and LLMs) love