Skip to main content
Parts are the fundamental building blocks of Donkeycar. A part is any Python class that implements a run() or run_threaded() method and can be added to the vehicle loop.

What is a Part?

A part is a modular component that:
  • Takes inputs from Memory
  • Performs some processing or I/O
  • Returns outputs to Memory
  • Runs on each iteration of the vehicle loop
Parts can be anything: sensors, actuators, controllers, processors, filters, or business logic.

Part Interface

The minimal part interface is simply a run() method:
class SimplePart:
    def run(self, input1, input2):
        # Process inputs
        output = input1 + input2
        return output

Full Interface

A complete part may implement:
class CompletePart:
    def __init__(self, config):
        """Initialize the part with configuration"""
        self.config = config
        self.state = None
    
    def run(self, *inputs):
        """Called each loop iteration (synchronous parts)"""
        # Process inputs and return outputs
        return output
    
    def run_threaded(self, *inputs):
        """Called each loop iteration (threaded parts)"""
        # Return cached result from background thread
        return self.cached_output
    
    def update(self):
        """Runs in background thread (threaded parts only)"""
        while self.on:
            # Continuously update self.cached_output
            self.cached_output = do_work()
    
    def shutdown(self):
        """Called when vehicle stops (optional)"""
        # Clean up resources
        self.on = False

Adding Parts to Vehicle

Parts are registered with the Vehicle using add():
V.add(part, 
      inputs=['input1', 'input2'],  # List of Memory keys to read
      outputs=['output1'],           # List of Memory keys to write
      threaded=False,                # Run in background thread?
      run_condition=None)            # Optional conditional execution

Example

import donkeycar as dk

V = dk.vehicle.Vehicle()

# Add a camera
cam = PiCamera()
V.add(cam, 
      inputs=[],                    # Camera needs no inputs
      outputs=['cam/image_array'],  # Produces images
      threaded=True)                # Runs in background

# Add a model
model = KerasPilot()
V.add(model,
      inputs=['cam/image_array'],         # Consumes camera images
      outputs=['pilot/angle', 'pilot/throttle'],  # Produces steering/throttle
      run_condition='run_pilot')          # Only when in autopilot mode

Input/Output Flow

The Vehicle automatically manages data flow:
  1. Get inputs: Vehicle reads input values from Memory
  2. Call part: Passes inputs as arguments to run() or run_threaded()
  3. Store outputs: Vehicle writes return values to Memory

Single Output

class SingleOutput:
    def run(self, x):
        return x * 2

V.add(SingleOutput(), 
      inputs=['x'], 
      outputs=['y'])  # Memory['y'] = return value

Multiple Outputs

class MultipleOutputs:
    def run(self, image):
        angle = compute_angle(image)
        throttle = compute_throttle(image)
        return angle, throttle  # Return tuple

V.add(MultipleOutputs(),
      inputs=['cam/image_array'],
      outputs=['pilot/angle', 'pilot/throttle'])  # Unpack tuple

No Output

class NoOutput:
    def run(self, steering, throttle):
        # Just actuate motors, no return value
        self.motor.set_steering(steering)
        self.motor.set_throttle(throttle)
        return None  # Or just don't return

V.add(NoOutput(),
      inputs=['steering', 'throttle'],
      outputs=[])  # Empty list
The number of return values must match the number of outputs specified. For multiple outputs, return a tuple: return output1, output2

Synchronous vs Threaded Parts

Synchronous Parts (Default)

When to use: Fast operations (< 10ms)
class ThrottleFilter:
    def __init__(self):
        self.reverse_triggered = False
        self.last_throttle = 0.0
    
    def run(self, throttle_in):
        if throttle_in is None:
            return throttle_in
        
        throttle_out = throttle_in
        
        if throttle_out < 0.0:
            if not self.reverse_triggered and self.last_throttle < 0.0:
                throttle_out = 0.0
                self.reverse_triggered = True
        else:
            self.reverse_triggered = False
        
        self.last_throttle = throttle_out
        return throttle_out

V.add(ThrottleFilter(), 
      inputs=['user/throttle'], 
      outputs=['user/throttle'],
      threaded=False)  # Runs in main loop
Example from throttle_filter.py:2-25.

Threaded Parts

When to use: Slow I/O operations (cameras, network, sensors)
class PiCamera:
    def __init__(self, image_w=160, image_h=120):
        # Initialize camera
        self.camera = Picamera2()
        self.camera.start()
        self.frame = None
        self.on = True
    
    def run(self):
        """Capture a frame (called by update thread)"""
        self.frame = self.camera.capture_array("main")
        return self.frame
    
    def update(self):
        """Background thread continuously captures frames"""
        while self.on:
            self.run()
    
    def run_threaded(self):
        """Main loop just returns latest frame"""
        return self.frame
    
    def shutdown(self):
        self.on = False
        self.camera.close()

V.add(PiCamera(), 
      inputs=[],
      outputs=['cam/image_array'],
      threaded=True)  # Runs in background thread
Adapted from camera.py:23-82. How it works:
  • update() runs in a background thread, continuously calling run()
  • Main vehicle loop calls run_threaded() to get latest cached result
  • No blocking of the main drive loop
Threaded parts should store results in instance variables (like self.frame). The main loop just reads the cached value, preventing slow I/O from blocking the drive loop.

Real-World Part Examples

Example 1: Transform Part

Simple functional wrapper from transform.py:5-19:
class Lambda:
    """
    Wraps a function into a donkey part.
    """
    def __init__(self, f):
        """Accepts the function to use."""
        self.f = f
    
    def run(self, *args, **kwargs):
        return self.f(*args, **kwargs)
    
    def shutdown(self):
        return

# Usage
V.add(Lambda(lambda x, y: x + y),
      inputs=['value1', 'value2'],
      outputs=['sum'])

Example 2: DriveMode Part

Logic part from basic.py:28-42:
class DriveMode:
    """Helper class to dispatch between ai and user driving"""
    
    def __init__(self, cfg):
        self.cfg = cfg
    
    def run(self, mode, user_angle, user_throttle, pilot_angle, pilot_throttle):
        if mode == 'user':
            return user_angle, user_throttle
        elif mode == 'local_angle':
            return pilot_angle if pilot_angle else 0.0, user_throttle
        else:
            return pilot_angle if pilot_angle else 0.0, \
                   pilot_throttle * self.cfg.AI_THROTTLE_MULT if \
                       pilot_throttle else 0.0

V.add(DriveMode(cfg=cfg),
      inputs=['user/mode', 'user/angle', 'user/throttle',
              'pilot/angle', 'pilot/throttle'],
      outputs=['angle', 'throttle'])

Example 3: PID Controller

Stateful processing part from transform.py:53-115:
class PIDController:
    """Performs a PID computation and returns a control value."""
    
    def __init__(self, p=0, i=0, d=0, debug=False):
        # initialize gains
        self.Kp = p
        self.Ki = i
        self.Kd = d
        
        # The value the controller is trying to achieve
        self.target = 0
        
        # initialize delta t variables
        self.prev_tm = time.time()
        self.prev_err = 0
        self.totalError = 0
        
        # initialize the output
        self.alpha = 0
        self.debug = debug
    
    def run(self, err):
        curr_tm = time.time()
        
        self.difError = err - self.prev_err
        
        # Calculate time differential
        dt = curr_tm - self.prev_tm
        
        # Initialize output variable
        curr_alpha = 0
        
        # Add proportional component
        curr_alpha += -self.Kp * err
        
        # Add integral component
        curr_alpha += -self.Ki * (self.totalError * dt)
        
        # Add differential component (avoiding divide-by-zero)
        if dt > 0:
            curr_alpha += -self.Kd * (self.difError / dt)
        
        # Update error accumulator
        self.totalError += err
        
        # Save current time for next iteration
        self.prev_tm = curr_tm
        self.prev_err = err
        
        if self.debug:
            print(f"PID: err={err:.3f} alpha={curr_alpha:.3f}")
        
        return curr_alpha

V.add(PIDController(p=0.1, i=0.01, d=0.05),
      inputs=['cte/error'],
      outputs=['pilot/steering'])

Example 4: TriggeredCallback

Event-driven part from transform.py:21-31:
class TriggeredCallback:
    def __init__(self, args, func_cb):
        self.args = args
        self.func_cb = func_cb
    
    def run(self, trigger):
        if trigger:
            self.func_cb(self.args)
    
    def shutdown(self):
        return

# Usage: Reload model when file changes
V.add(TriggeredCallback(model_path, reload_model),
      inputs=["modelfile/reload"],
      run_condition="run_pilot")

Example 5: Conditional Logic Part

From simulator.py:122-128:
class PilotCondition:
    def run(self, mode):
        if mode == 'user':
            return False
        else:
            return True

V.add(PilotCondition(), 
      inputs=['user/mode'], 
      outputs=['run_pilot'])
Parts can be simple! Many parts are just a few lines of logic. Don’t over-engineer - if it has inputs, outputs, and a run() method, it’s a valid part.

Run Conditions

Parts can execute conditionally:
V.add(autopilot,
      inputs=['cam/image_array'],
      outputs=['pilot/angle', 'pilot/throttle'],
      run_condition='run_pilot')  # Only runs when Memory['run_pilot'] == True
From the Vehicle implementation (vehicle.py:194-196):
run = True
if entry.get('run_condition'):
    run_condition = entry.get('run_condition')
    run = self.mem.get([run_condition])[0]
Example usage from complete.py:415:
V.add(kl, inputs=inputs, outputs=outputs, run_condition='run_pilot')

Use Cases

  1. Mode switching: User vs autopilot
  2. Conditional recording: Only record when button pressed
  3. Button handlers: Execute when button pressed
  4. Feature toggles: Enable/disable features

Memory Channel Naming Conventions

Follow these patterns for clarity:
  • Sensors: cam/image_array, imu/acl_x, lidar/dist_array
  • User input: user/angle, user/throttle, user/mode
  • Autopilot: pilot/angle, pilot/throttle, pilot/loc
  • Actuators: steering, throttle, brake
  • Recording: recording, tub/num_records
  • Conditions: run_pilot, run_user
  • Web UI: web/buttons, web/w1, web/w2

Creating Custom Parts

Simple Processing Part

class ImagePreprocessor:
    def __init__(self, target_size=(160, 120)):
        self.target_size = target_size
    
    def run(self, image):
        # Resize and normalize
        resized = cv2.resize(image, self.target_size)
        normalized = resized / 255.0
        return normalized

V.add(ImagePreprocessor(),
      inputs=['cam/image_array'],
      outputs=['cam/image_normalized'])

Sensor Part with Threading

class GPSSensor:
    def __init__(self, port='/dev/ttyUSB0'):
        self.serial = Serial(port, 9600)
        self.position = None
        self.on = True
    
    def run(self):
        """Read GPS data (called by update thread)"""
        line = self.serial.readline()
        self.position = self.parse_nmea(line)
        return self.position
    
    def update(self):
        """Background thread"""
        while self.on:
            self.run()
    
    def run_threaded(self):
        """Main loop reads cached position"""
        return self.position
    
    def shutdown(self):
        self.on = False
        self.serial.close()

V.add(GPSSensor(), 
      outputs=['gps/position'],
      threaded=True)

Actuator Part

class ServoSteering:
    def __init__(self, channel=0):
        self.pwm = PCA9685(channel)
        self.left_pulse = 290
        self.right_pulse = 490
    
    def run(self, angle):
        """Convert angle [-1, 1] to PWM pulse"""
        # Map angle to pulse width
        pulse = self.left_pulse + (angle + 1) * \
                (self.right_pulse - self.left_pulse) / 2
        self.pwm.set_pulse(pulse)
        # No output needed
    
    def shutdown(self):
        # Center steering
        self.run(0)

V.add(ServoSteering(),
      inputs=['steering'],
      outputs=[])

Filter Part

class LowPassFilter:
    def __init__(self, alpha=0.5):
        self.alpha = alpha
        self.prev_value = None
    
    def run(self, value):
        if self.prev_value is None:
            self.prev_value = value
            return value
        
        # Exponential moving average
        filtered = self.alpha * value + (1 - self.alpha) * self.prev_value
        self.prev_value = filtered
        return filtered

V.add(LowPassFilter(alpha=0.3),
      inputs=['sensor/raw'],
      outputs=['sensor/filtered'])

Best Practices

  1. Keep parts simple: Each part should do one thing well
  2. Make I/O threaded: Cameras, network, sensors should run in background
  3. Store state in instance variables: For stateful parts like filters or accumulators
  4. Implement shutdown: Clean up resources (close files, stop motors, etc.)
  5. Handle None gracefully: Inputs may be None on first iteration
  6. Use meaningful names: Clear input/output channel names
  7. Document your parts: Add docstrings explaining purpose and usage
  8. Test independently: Parts should be testable without the vehicle loop

Part Testing

Test parts independently before adding to vehicle:
# Test a part
part = MyCustomPart()

# Simulate inputs
input1 = "test_data"
input2 = 42

# Call run
output = part.run(input1, input2)

# Verify output
assert output == expected_value

Common Patterns

Button Handler

V.add(Lambda(lambda: print("Button clicked!")),
      run_condition="web/w1")  # Runs when button pressed

Data Pipeline

# Part 1: Capture
V.add(camera, outputs=['cam/raw'])

# Part 2: Preprocess
V.add(preprocessor, inputs=['cam/raw'], outputs=['cam/processed'])

# Part 3: Inference
V.add(model, inputs=['cam/processed'], outputs=['pilot/angle'])

# Part 4: Filter
V.add(filter, inputs=['pilot/angle'], outputs=['pilot/angle_filtered'])

# Part 5: Actuate
V.add(steering, inputs=['pilot/angle_filtered'])

Conditional Branching

# User path
V.add(user_controller, outputs=['user/angle'], run_condition='run_user')

# Autopilot path
V.add(autopilot, outputs=['pilot/angle'], run_condition='run_pilot')

# Merge paths
class Selector:
    def run(self, user_angle, pilot_angle, mode):
        return user_angle if mode == 'user' else pilot_angle

V.add(Selector(), 
      inputs=['user/angle', 'pilot/angle', 'user/mode'],
      outputs=['angle'])

Next Steps

Build docs developers (and LLMs) love