Skip to main content
The Vehicle class is the heart of Donkeycar. It manages the main drive loop that runs all parts, handles data flow between parts through a shared memory system, and orchestrates the execution of your robotic vehicle.

Overview

The Vehicle creates a processing pipeline where each part runs sequentially in the order added. Parts communicate through a shared memory system, where outputs from one part can be inputs to another.
import donkeycar as dk

# Create a vehicle
V = dk.vehicle.Vehicle()

# Add parts
V.add(camera, outputs=['cam/image_array'], threaded=True)
V.add(controller, 
      inputs=['cam/image_array'],
      outputs=['user/angle', 'user/throttle'],
      threaded=True)

# Start the drive loop at 20Hz
V.start(rate_hz=20)

Constructor

__init__(mem=None)

Creates a new Vehicle instance.
mem
Memory
default:"None"
Optional Memory instance for storing part data. If not provided, a new Memory instance is created automatically.
Attributes:
  • mem - The Memory instance used for data storage
  • parts - List of part entries in the vehicle pipeline
  • on - Boolean flag controlling the drive loop
  • threads - List of active threads for threaded parts
  • profiler - PartProfiler instance for performance monitoring
import donkeycar as dk
from donkeycar.memory import Memory

# Create with default memory
V = dk.vehicle.Vehicle()

# Or provide custom memory instance
mem = Memory()
V = dk.vehicle.Vehicle(mem=mem)

Methods

add(part, inputs=[], outputs=[], threaded=False, run_condition=None)

Adds a part to the vehicle drive loop. Parts are executed in the order they are added.
part
object
required
A part instance with either a run() or run_threaded() method. The part will be called on every loop iteration.
inputs
list
default:"[]"
List of memory keys to retrieve and pass as arguments to the part’s run method. Values are retrieved from memory in the order specified.
outputs
list
default:"[]"
List of memory keys where the part’s return values will be stored. If the part returns a tuple, values are stored in order.
threaded
bool
default:"False"
If True, the part’s update() method runs in a separate thread. The main loop calls run_threaded() to get the latest output. Use for I/O-bound operations like camera capture.
run_condition
str
default:"None"
Memory key that determines if this part should run. The part only executes when the value at this key is truthy.
Example: Basic Part
import donkeycar as dk
from donkeycar.parts.camera import PiCamera

V = dk.vehicle.Vehicle()

# Add camera that outputs images
cam = PiCamera(image_w=160, image_h=120, image_d=3)
V.add(cam, outputs=['cam/image_array'], threaded=True)
Example: Part with Inputs and Outputs
# Add controller that reads image and outputs controls
from donkeycar.parts.controller import LocalWebController

ctr = LocalWebController()
V.add(ctr,
      inputs=['cam/image_array'],
      outputs=['user/angle', 'user/throttle', 'user/mode'],
      threaded=True)
Example: Conditional Execution
# Add autopilot that only runs when 'run_pilot' is True
kl = dk.utils.get_model_by_type('linear', cfg)
kl.load('models/pilot.h5')

V.add(kl, 
      inputs=['cam/image_array'],
      outputs=['pilot/angle', 'pilot/throttle'],
      run_condition='run_pilot')
Example: Complete Pipeline from basic.py
import donkeycar as dk
from donkeycar.parts.camera import PiCamera
from donkeycar.parts.controller import LocalWebController
from donkeycar.parts.actuator import PCA9685, PWMSteering, PWMThrottle

car = dk.vehicle.Vehicle()

# Camera
cam = PiCamera(image_w=160, image_h=120, image_d=3)
car.add(cam, outputs=['cam/image_array'], threaded=True)

# Controller
ctr = LocalWebController()
car.add(ctr,
        inputs=['cam/image_array'],
        outputs=['user/angle', 'user/throttle', 'user/mode'],
        threaded=True)

# Steering
steering_controller = PCA9685(0, 0x40)
steering = PWMSteering(controller=steering_controller,
                      left_pulse=460, right_pulse=290)
car.add(steering, inputs=['user/angle'])

# Throttle
throttle_controller = PCA9685(1, 0x40)
throttle = PWMThrottle(controller=throttle_controller,
                       max_pulse=500, zero_pulse=370, min_pulse=220)
car.add(throttle, inputs=['user/throttle'])

car.start(rate_hz=20)

start(rate_hz=10, max_loop_count=None, verbose=False)

Starts the vehicle’s main drive loop. This is a blocking call that runs until interrupted or max_loop_count is reached.
rate_hz
int
default:"10"
Target frequency for the drive loop in Hz. The actual frequency may be lower if parts take too long to process.
max_loop_count
int
default:"None"
Maximum number of loop iterations. Used for testing. None means run indefinitely until interrupted.
verbose
bool
default:"False"
If True, prints debug information including timing violations and periodic performance reports every 200 loops.
Returns:
loop_count
int
Total number of loops executed
loop_total_time
float
Total time elapsed in seconds
Example: Basic Usage
import donkeycar as dk

V = dk.vehicle.Vehicle()
# ... add parts ...

# Run at 20 Hz indefinitely
V.start(rate_hz=20)
Example: Testing with Loop Count
# Run exactly 100 loops for testing
loop_count, total_time = V.start(rate_hz=20, max_loop_count=100)
print(f"Ran {loop_count} loops in {total_time:.2f} seconds")
print(f"Average frequency: {loop_count/total_time:.1f} Hz")
Example: Verbose Mode for Debugging
# Enable verbose output to see performance issues
V.start(rate_hz=20, verbose=True)
# Output:
# WARN::Vehicle: jitter violation in vehicle loop with 15ms
# Part Profile Summary: (times in ms)
# +--------------+-------+------+------+-------+-------+--------+--------+
# | part         | max   | min  | avg  | 50%   | 90%   | 99%    | 99.9%  |
# +--------------+-------+------+------+-------+-------+--------+--------+
# | PiCamera     | 45.20 | 0.10 | 33.5 | 33.2  | 34.8  | 45.1   | 45.2   |
# +--------------+-------+------+------+-------+-------+--------+--------+
Example: From complete.py Template
# Typical usage in manage.py drive command
def drive(cfg):
    V = dk.vehicle.Vehicle()
    
    # Add all parts
    add_camera(V, cfg)
    add_user_controller(V, cfg)
    add_drivetrain(V, cfg)
    # ... more parts ...
    
    # Start the vehicle
    V.start(rate_hz=cfg.DRIVE_LOOP_HZ, max_loop_count=cfg.MAX_LOOPS)

update_parts()

Executes one iteration of the drive loop. Called automatically by start(). This method:
  1. Loops through all parts in order
  2. Checks each part’s run condition
  3. Retrieves inputs from memory
  4. Calls the part’s run() or run_threaded() method
  5. Stores outputs back to memory
  6. Profiles execution time
# Called internally by start(), but can be used manually
V = dk.vehicle.Vehicle()
# ... add parts ...

V.on = True
while V.on:
    V.update_parts()
    # custom logic here
Part Execution Flow:
# Example of what happens internally in update_parts()
for entry in self.parts:
    # Check run condition
    if entry.get('run_condition'):
        run_condition = entry.get('run_condition')
        run = self.mem.get([run_condition])[0]
        if not run:
            continue
    
    # Get the part
    p = entry['part']
    
    # Get inputs from memory
    inputs = self.mem.get(entry['inputs'])
    
    # Run the part
    if entry.get('thread'):
        outputs = p.run_threaded(*inputs)
    else:
        outputs = p.run(*inputs)
    
    # Save outputs to memory
    if outputs is not None:
        self.mem.put(entry['outputs'], outputs)

stop()

Stops the vehicle and shuts down all parts. Automatically called when the drive loop exits. This method:
  • Calls shutdown() on each part (if the method exists)
  • Prints a performance profiler report
  • Handles exceptions gracefully for parts without shutdown methods
V = dk.vehicle.Vehicle()
# ... add parts ...

try:
    V.start(rate_hz=20)
except KeyboardInterrupt:
    pass
finally:
    V.stop()  # Ensure clean shutdown
Example: Custom Part with Shutdown
class Camera:
    def __init__(self):
        self.camera = initialize_camera()
    
    def run(self):
        return self.camera.capture()
    
    def shutdown(self):
        """Called by Vehicle.stop()"""
        self.camera.close()
        print("Camera closed")

V = dk.vehicle.Vehicle()
V.add(Camera(), outputs=['image'])
V.start(rate_hz=20)
# On exit, Camera.shutdown() is automatically called

remove(part)

Removes a part from the vehicle pipeline.
part
object
required
The part instance to remove from the pipeline
V = dk.vehicle.Vehicle()
cam = Camera()
V.add(cam, outputs=['image'])

# Later, remove the part
V.remove(cam)
Removing parts while the vehicle is running may cause unexpected behavior. It’s recommended to stop the vehicle before modifying the pipeline.

Performance Profiling

The Vehicle automatically tracks execution time for each part using a PartProfiler. Enable verbose mode to see performance statistics:
V.start(rate_hz=20, verbose=True)
Profile reports show:
  • max - Longest execution time for the part
  • min - Shortest execution time
  • avg - Average execution time
  • 50%/90%/99%/99.9% - Percentile execution times
Use this to identify slow parts that may be causing the vehicle to miss its target loop rate.

Memory and Data Flow

The Vehicle uses a shared Memory instance for data flow:
# Data flows through named channels
V.add(camera, outputs=['cam/image_array'])
V.add(model, inputs=['cam/image_array'], outputs=['pilot/angle'])
V.add(motor, inputs=['pilot/angle'])

# Memory keys can be any string
# Common convention: component/property
# Examples: 'cam/image_array', 'user/throttle', 'pilot/angle'

See Also

Build docs developers (and LLMs) love