Parts are the fundamental building blocks of Donkeycar. A part is any Python class that implements a run() or run_threaded() method and can be added to the vehicle loop.
What is a Part?
A part is a modular component that:
- Takes inputs from Memory
- Performs some processing or I/O
- Returns outputs to Memory
- Runs on each iteration of the vehicle loop
Parts can be anything: sensors, actuators, controllers, processors, filters, or business logic.
Part Interface
The minimal part interface is simply a run() method:
class SimplePart:
def run(self, input1, input2):
# Process inputs
output = input1 + input2
return output
Full Interface
A complete part may implement:
class CompletePart:
def __init__(self, config):
"""Initialize the part with configuration"""
self.config = config
self.state = None
def run(self, *inputs):
"""Called each loop iteration (synchronous parts)"""
# Process inputs and return outputs
return output
def run_threaded(self, *inputs):
"""Called each loop iteration (threaded parts)"""
# Return cached result from background thread
return self.cached_output
def update(self):
"""Runs in background thread (threaded parts only)"""
while self.on:
# Continuously update self.cached_output
self.cached_output = do_work()
def shutdown(self):
"""Called when vehicle stops (optional)"""
# Clean up resources
self.on = False
Adding Parts to Vehicle
Parts are registered with the Vehicle using add():
V.add(part,
inputs=['input1', 'input2'], # List of Memory keys to read
outputs=['output1'], # List of Memory keys to write
threaded=False, # Run in background thread?
run_condition=None) # Optional conditional execution
Example
import donkeycar as dk
V = dk.vehicle.Vehicle()
# Add a camera
cam = PiCamera()
V.add(cam,
inputs=[], # Camera needs no inputs
outputs=['cam/image_array'], # Produces images
threaded=True) # Runs in background
# Add a model
model = KerasPilot()
V.add(model,
inputs=['cam/image_array'], # Consumes camera images
outputs=['pilot/angle', 'pilot/throttle'], # Produces steering/throttle
run_condition='run_pilot') # Only when in autopilot mode
The Vehicle automatically manages data flow:
- Get inputs: Vehicle reads input values from Memory
- Call part: Passes inputs as arguments to
run() or run_threaded()
- Store outputs: Vehicle writes return values to Memory
Single Output
class SingleOutput:
def run(self, x):
return x * 2
V.add(SingleOutput(),
inputs=['x'],
outputs=['y']) # Memory['y'] = return value
Multiple Outputs
class MultipleOutputs:
def run(self, image):
angle = compute_angle(image)
throttle = compute_throttle(image)
return angle, throttle # Return tuple
V.add(MultipleOutputs(),
inputs=['cam/image_array'],
outputs=['pilot/angle', 'pilot/throttle']) # Unpack tuple
No Output
class NoOutput:
def run(self, steering, throttle):
# Just actuate motors, no return value
self.motor.set_steering(steering)
self.motor.set_throttle(throttle)
return None # Or just don't return
V.add(NoOutput(),
inputs=['steering', 'throttle'],
outputs=[]) # Empty list
The number of return values must match the number of outputs specified. For multiple outputs, return a tuple: return output1, output2
Synchronous vs Threaded Parts
Synchronous Parts (Default)
When to use: Fast operations (< 10ms)
class ThrottleFilter:
def __init__(self):
self.reverse_triggered = False
self.last_throttle = 0.0
def run(self, throttle_in):
if throttle_in is None:
return throttle_in
throttle_out = throttle_in
if throttle_out < 0.0:
if not self.reverse_triggered and self.last_throttle < 0.0:
throttle_out = 0.0
self.reverse_triggered = True
else:
self.reverse_triggered = False
self.last_throttle = throttle_out
return throttle_out
V.add(ThrottleFilter(),
inputs=['user/throttle'],
outputs=['user/throttle'],
threaded=False) # Runs in main loop
Example from throttle_filter.py:2-25.
Threaded Parts
When to use: Slow I/O operations (cameras, network, sensors)
class PiCamera:
def __init__(self, image_w=160, image_h=120):
# Initialize camera
self.camera = Picamera2()
self.camera.start()
self.frame = None
self.on = True
def run(self):
"""Capture a frame (called by update thread)"""
self.frame = self.camera.capture_array("main")
return self.frame
def update(self):
"""Background thread continuously captures frames"""
while self.on:
self.run()
def run_threaded(self):
"""Main loop just returns latest frame"""
return self.frame
def shutdown(self):
self.on = False
self.camera.close()
V.add(PiCamera(),
inputs=[],
outputs=['cam/image_array'],
threaded=True) # Runs in background thread
Adapted from camera.py:23-82.
How it works:
update() runs in a background thread, continuously calling run()
- Main vehicle loop calls
run_threaded() to get latest cached result
- No blocking of the main drive loop
Threaded parts should store results in instance variables (like self.frame). The main loop just reads the cached value, preventing slow I/O from blocking the drive loop.
Real-World Part Examples
Simple functional wrapper from transform.py:5-19:
class Lambda:
"""
Wraps a function into a donkey part.
"""
def __init__(self, f):
"""Accepts the function to use."""
self.f = f
def run(self, *args, **kwargs):
return self.f(*args, **kwargs)
def shutdown(self):
return
# Usage
V.add(Lambda(lambda x, y: x + y),
inputs=['value1', 'value2'],
outputs=['sum'])
Example 2: DriveMode Part
Logic part from basic.py:28-42:
class DriveMode:
"""Helper class to dispatch between ai and user driving"""
def __init__(self, cfg):
self.cfg = cfg
def run(self, mode, user_angle, user_throttle, pilot_angle, pilot_throttle):
if mode == 'user':
return user_angle, user_throttle
elif mode == 'local_angle':
return pilot_angle if pilot_angle else 0.0, user_throttle
else:
return pilot_angle if pilot_angle else 0.0, \
pilot_throttle * self.cfg.AI_THROTTLE_MULT if \
pilot_throttle else 0.0
V.add(DriveMode(cfg=cfg),
inputs=['user/mode', 'user/angle', 'user/throttle',
'pilot/angle', 'pilot/throttle'],
outputs=['angle', 'throttle'])
Example 3: PID Controller
Stateful processing part from transform.py:53-115:
class PIDController:
"""Performs a PID computation and returns a control value."""
def __init__(self, p=0, i=0, d=0, debug=False):
# initialize gains
self.Kp = p
self.Ki = i
self.Kd = d
# The value the controller is trying to achieve
self.target = 0
# initialize delta t variables
self.prev_tm = time.time()
self.prev_err = 0
self.totalError = 0
# initialize the output
self.alpha = 0
self.debug = debug
def run(self, err):
curr_tm = time.time()
self.difError = err - self.prev_err
# Calculate time differential
dt = curr_tm - self.prev_tm
# Initialize output variable
curr_alpha = 0
# Add proportional component
curr_alpha += -self.Kp * err
# Add integral component
curr_alpha += -self.Ki * (self.totalError * dt)
# Add differential component (avoiding divide-by-zero)
if dt > 0:
curr_alpha += -self.Kd * (self.difError / dt)
# Update error accumulator
self.totalError += err
# Save current time for next iteration
self.prev_tm = curr_tm
self.prev_err = err
if self.debug:
print(f"PID: err={err:.3f} alpha={curr_alpha:.3f}")
return curr_alpha
V.add(PIDController(p=0.1, i=0.01, d=0.05),
inputs=['cte/error'],
outputs=['pilot/steering'])
Example 4: TriggeredCallback
Event-driven part from transform.py:21-31:
class TriggeredCallback:
def __init__(self, args, func_cb):
self.args = args
self.func_cb = func_cb
def run(self, trigger):
if trigger:
self.func_cb(self.args)
def shutdown(self):
return
# Usage: Reload model when file changes
V.add(TriggeredCallback(model_path, reload_model),
inputs=["modelfile/reload"],
run_condition="run_pilot")
Example 5: Conditional Logic Part
From simulator.py:122-128:
class PilotCondition:
def run(self, mode):
if mode == 'user':
return False
else:
return True
V.add(PilotCondition(),
inputs=['user/mode'],
outputs=['run_pilot'])
Parts can be simple! Many parts are just a few lines of logic. Don’t over-engineer - if it has inputs, outputs, and a run() method, it’s a valid part.
Run Conditions
Parts can execute conditionally:
V.add(autopilot,
inputs=['cam/image_array'],
outputs=['pilot/angle', 'pilot/throttle'],
run_condition='run_pilot') # Only runs when Memory['run_pilot'] == True
From the Vehicle implementation (vehicle.py:194-196):
run = True
if entry.get('run_condition'):
run_condition = entry.get('run_condition')
run = self.mem.get([run_condition])[0]
Example usage from complete.py:415:
V.add(kl, inputs=inputs, outputs=outputs, run_condition='run_pilot')
Use Cases
- Mode switching: User vs autopilot
- Conditional recording: Only record when button pressed
- Button handlers: Execute when button pressed
- Feature toggles: Enable/disable features
Memory Channel Naming Conventions
Follow these patterns for clarity:
- Sensors:
cam/image_array, imu/acl_x, lidar/dist_array
- User input:
user/angle, user/throttle, user/mode
- Autopilot:
pilot/angle, pilot/throttle, pilot/loc
- Actuators:
steering, throttle, brake
- Recording:
recording, tub/num_records
- Conditions:
run_pilot, run_user
- Web UI:
web/buttons, web/w1, web/w2
Creating Custom Parts
Simple Processing Part
class ImagePreprocessor:
def __init__(self, target_size=(160, 120)):
self.target_size = target_size
def run(self, image):
# Resize and normalize
resized = cv2.resize(image, self.target_size)
normalized = resized / 255.0
return normalized
V.add(ImagePreprocessor(),
inputs=['cam/image_array'],
outputs=['cam/image_normalized'])
Sensor Part with Threading
class GPSSensor:
def __init__(self, port='/dev/ttyUSB0'):
self.serial = Serial(port, 9600)
self.position = None
self.on = True
def run(self):
"""Read GPS data (called by update thread)"""
line = self.serial.readline()
self.position = self.parse_nmea(line)
return self.position
def update(self):
"""Background thread"""
while self.on:
self.run()
def run_threaded(self):
"""Main loop reads cached position"""
return self.position
def shutdown(self):
self.on = False
self.serial.close()
V.add(GPSSensor(),
outputs=['gps/position'],
threaded=True)
Actuator Part
class ServoSteering:
def __init__(self, channel=0):
self.pwm = PCA9685(channel)
self.left_pulse = 290
self.right_pulse = 490
def run(self, angle):
"""Convert angle [-1, 1] to PWM pulse"""
# Map angle to pulse width
pulse = self.left_pulse + (angle + 1) * \
(self.right_pulse - self.left_pulse) / 2
self.pwm.set_pulse(pulse)
# No output needed
def shutdown(self):
# Center steering
self.run(0)
V.add(ServoSteering(),
inputs=['steering'],
outputs=[])
Filter Part
class LowPassFilter:
def __init__(self, alpha=0.5):
self.alpha = alpha
self.prev_value = None
def run(self, value):
if self.prev_value is None:
self.prev_value = value
return value
# Exponential moving average
filtered = self.alpha * value + (1 - self.alpha) * self.prev_value
self.prev_value = filtered
return filtered
V.add(LowPassFilter(alpha=0.3),
inputs=['sensor/raw'],
outputs=['sensor/filtered'])
Best Practices
- Keep parts simple: Each part should do one thing well
- Make I/O threaded: Cameras, network, sensors should run in background
- Store state in instance variables: For stateful parts like filters or accumulators
- Implement shutdown: Clean up resources (close files, stop motors, etc.)
- Handle None gracefully: Inputs may be None on first iteration
- Use meaningful names: Clear input/output channel names
- Document your parts: Add docstrings explaining purpose and usage
- Test independently: Parts should be testable without the vehicle loop
Part Testing
Test parts independently before adding to vehicle:
# Test a part
part = MyCustomPart()
# Simulate inputs
input1 = "test_data"
input2 = 42
# Call run
output = part.run(input1, input2)
# Verify output
assert output == expected_value
Common Patterns
V.add(Lambda(lambda: print("Button clicked!")),
run_condition="web/w1") # Runs when button pressed
Data Pipeline
# Part 1: Capture
V.add(camera, outputs=['cam/raw'])
# Part 2: Preprocess
V.add(preprocessor, inputs=['cam/raw'], outputs=['cam/processed'])
# Part 3: Inference
V.add(model, inputs=['cam/processed'], outputs=['pilot/angle'])
# Part 4: Filter
V.add(filter, inputs=['pilot/angle'], outputs=['pilot/angle_filtered'])
# Part 5: Actuate
V.add(steering, inputs=['pilot/angle_filtered'])
Conditional Branching
# User path
V.add(user_controller, outputs=['user/angle'], run_condition='run_user')
# Autopilot path
V.add(autopilot, outputs=['pilot/angle'], run_condition='run_pilot')
# Merge paths
class Selector:
def run(self, user_angle, pilot_angle, mode):
return user_angle if mode == 'user' else pilot_angle
V.add(Selector(),
inputs=['user/angle', 'pilot/angle', 'user/mode'],
outputs=['angle'])
Next Steps