Skip to main content
The Vehicle class is the execution engine of Donkeycar. It runs a continuous loop that orchestrates all parts, manages data flow through Memory, and maintains consistent timing.

The Vehicle Class

Defined in donkeycar/vehicle.py:61, the Vehicle class is simple but powerful:
class Vehicle:
    def __init__(self, mem=None):
        if not mem:
            mem = Memory()
        self.mem = mem          # Shared memory for data
        self.parts = []         # List of parts to execute
        self.on = True          # Loop control flag
        self.threads = []       # Background thread handles
        self.profiler = PartProfiler()  # Performance tracking

Adding Parts

Parts are registered with the add() method:
def add(self, part, inputs=[], outputs=[], threaded=False, run_condition=None):
    """
    Method to add a part to the vehicle drive loop.
    
    Parameters
    ----------
    part : class
        Donkey vehicle part has run() attribute
    inputs : list
        Channel names to get from memory.
    outputs : list
        Channel names to save to memory.
    threaded : boolean
        If a part should be run in a separate thread.
    run_condition : str
        If a part should be run or not
    """
Example from the source code (complete.py:105):
V.add(cam, inputs=inputs, outputs=['cam/image_array'], threaded=True)
Parts are executed in the order they are added. This execution order is important because later parts can use outputs from earlier parts in the same iteration.

The Drive Loop

The main loop is started with the start() method:
def start(self, rate_hz=10, max_loop_count=None, verbose=False):
    """
    Start vehicle's main drive loop.
    
    This is the main thread of the vehicle. It starts all the new
    threads for the threaded parts then starts an infinite loop
    that runs each part and updates the memory.
    
    Parameters
    ----------
    rate_hz : int
        The max frequency that the drive loop should run. The actual
        frequency may be less than this if there are many blocking parts.
    max_loop_count : int
        Maximum number of loops the drive loop should execute. This is
        used for testing that all the parts of the vehicle work.
    verbose: bool
        If debug output should be printed into shell
    """

Loop Initialization

From vehicle.py:137-147:
self.on = True

for entry in self.parts:
    if entry.get('thread'):
        # start the update thread
        entry.get('thread').start()

# wait until the parts warm up.
logger.info('Starting vehicle at {} Hz'.format(rate_hz))

loop_start_time = time.time()
loop_count = 0

Main Loop Execution

The core loop structure (vehicle.py:151-172):
while self.on:
    start_time = time.time()
    loop_count += 1
    
    self.update_parts()  # Execute all parts
    
    # stop drive loop if loop_count exceeds max_loopcount
    if max_loop_count and loop_count >= max_loop_count:
        self.on = False
    else:
        # Calculate sleep time to maintain rate_hz
        sleep_time = 1.0 / rate_hz - (time.time() - start_time)
        if sleep_time > 0.0:
            time.sleep(sleep_time)
        else:
            # print a message when could not maintain loop rate.
            if verbose:
                logger.info('WARN::Vehicle: jitter violation in vehicle loop '
                      'with {0:4.0f}ms'.format(abs(1000 * sleep_time)))
        
        if verbose and loop_count % 200 == 0:
            self.profiler.report()
The loop automatically sleeps between iterations to maintain the target rate_hz. If processing takes too long, you’ll see “jitter violation” warnings when running in verbose mode.

Update Parts Cycle

The update_parts() method executes all registered parts (vehicle.py:186-215):
def update_parts(self):
    '''
    loop over all parts
    '''
    for entry in self.parts:
        
        run = True
        # check run condition, if it exists
        if entry.get('run_condition'):
            run_condition = entry.get('run_condition')
            run = self.mem.get([run_condition])[0]
        
        if run:
            # get part
            p = entry['part']
            # start timing part run
            self.profiler.on_part_start(p)
            # get inputs from memory
            inputs = self.mem.get(entry['inputs'])
            # run the part
            if entry.get('thread'):
                outputs = p.run_threaded(*inputs)
            else:
                outputs = p.run(*inputs)
            
            # save the output to memory
            if outputs is not None:
                self.mem.put(entry['outputs'], outputs)
            # finish timing part run
            self.profiler.on_part_finished(p)

Execution Steps

For each part, the vehicle:
  1. Check run condition - If a run_condition is specified, check if it’s True in Memory
  2. Get inputs - Retrieve all input values from Memory using mem.get()
  3. Execute part - Call either run() or run_threaded() with the inputs
  4. Store outputs - Save return values to Memory using mem.put()
  5. Profile - Track execution time for performance analysis

Rate Control

The rate_hz parameter controls loop frequency:
V.start(rate_hz=20)  # Run at 20 Hz (50ms per iteration)

How It Works

sleep_time = 1.0 / rate_hz - (time.time() - start_time)
if sleep_time > 0.0:
    time.sleep(sleep_time)
  • Calculates how long the current iteration took
  • Sleeps for the remaining time to hit target rate
  • If processing takes longer than 1/rate_hz, no sleep (jitter violation)
A typical Donkeycar runs at 20 Hz, giving each iteration 50ms. If your parts take longer than this, consider:
  • Making slow parts threaded
  • Reducing camera resolution
  • Optimizing model inference
  • Lowering the rate_hz

Threading Model

Synchronous Parts

Default behavior - part runs in main loop:
V.add(part, inputs=['input'], outputs=['output'], threaded=False)
The Vehicle calls:
outputs = part.run(*inputs)

Threaded Parts

For I/O-bound operations:
V.add(camera, inputs=[], outputs=['cam/image_array'], threaded=True)

Thread Creation

From vehicle.py:102-105:
if threaded:
    t = Thread(target=part.update, args=())
    t.daemon = True
    entry['thread'] = t

Thread Execution

The part must implement update() which runs continuously:
class Camera:
    def update(self):
        # keep looping infinitely until the thread is stopped
        while self.on:
            self.run()  # Capture new frame
            
    def run_threaded(self):
        return self.frame  # Return latest frame
  • update() runs in background thread, continuously capturing frames
  • run_threaded() is called by main loop to get latest result
Example from camera.py:71-74:
def update(self):
    # keep looping infinitely until the thread is stopped
    while self.on:
        self.run()
Threaded parts should store their latest result in an instance variable (like self.frame). The main loop just reads this cached value, so slow I/O doesn’t block the drive loop.

Run Conditions

Parts can execute conditionally based on Memory values:
V.add(autopilot_model,
      inputs=['cam/image_array'],
      outputs=['pilot/angle', 'pilot/throttle'],
      run_condition='run_pilot')
The part only executes when Memory['run_pilot'] == True.

Example from complete.py

# This part sets run_pilot based on user mode
class PilotCondition:
    def run(self, mode):
        return mode != 'user'

V.add(PilotCondition(), 
      inputs=['user/mode'], 
      outputs=['run_pilot'])

# Model only runs when run_pilot is True
V.add(kl, 
      inputs=['cam/image_array'],
      outputs=['pilot/angle', 'pilot/throttle'],
      run_condition='run_pilot')
This pattern enables:
  • User vs autopilot mode switching
  • Conditional recording
  • Button-triggered actions
  • Feature toggles

Performance Profiling

The PartProfiler tracks execution time for each part:
class PartProfiler:
    def profile_part(self, p):
        self.records[p] = { "times" : [] }
    
    def on_part_start(self, p):
        self.records[p]['times'].append(time.time())
    
    def on_part_finished(self, p):
        now = time.time()
        prev = self.records[p]['times'][-1]
        delta = now - prev
        self.records[p]['times'][-1] = delta

Profiler Report

When the vehicle stops, it prints a summary:
Part Profile Summary: (times in ms)
+-----------------+------+------+------+-------+-------+-------+--------+
|      part       | max  | min  | avg  |  50%  |  90%  |  99%  | 99.9%  |
+-----------------+------+------+------+-------+-------+-------+--------+
| PiCamera        | 45.2 | 32.1 | 35.4 | 34.5  | 38.2  | 42.1  | 44.8   |
| KerasPilot      | 68.3 | 45.6 | 52.1 | 51.2  | 58.9  | 65.4  | 67.9   |
| PWMSteering     | 2.3  | 0.8  | 1.2  | 1.1   | 1.5   | 2.0   | 2.2    |
+-----------------+------+------+------+-------+-------+-------+--------+
From vehicle.py:39-58:
def report(self):
    logger.info("Part Profile Summary: (times in ms)")
    pt = PrettyTable()
    field_names = ["part", "max", "min", "avg"]
    pctile = [50, 90, 99, 99.9]
    pt.field_names = field_names + [str(p) + '%' for p in pctile]
    for p, val in self.records.items():
        arr = val['times'][1:-1]
        if len(arr) == 0:
            continue
        row = [p.__class__.__name__,
               "%.2f" % (max(arr) * 1000),
               "%.2f" % (min(arr) * 1000),
               "%.2f" % (sum(arr) / len(arr) * 1000)]
        row += ["%.2f" % (np.percentile(arr, p) * 1000) for p in pctile]
        pt.add_row(row)
    logger.info('\n' + str(pt))
Run with verbose=True to see profiling data every 200 iterations during execution, helping you identify bottlenecks in real-time.

Shutdown Sequence

When the vehicle stops (vehicle.py:217-228):
def stop(self):        
    logger.info('Shutting down vehicle and its parts...')
    for entry in self.parts:
        try:
            entry['part'].shutdown()
        except AttributeError:
            # usually from missing shutdown method, which should be optional
            pass
        except Exception as e:
            logger.error(e)
    
    self.profiler.report()
  • Calls shutdown() on all parts (if they have it)
  • Prints final profiling report
  • Gracefully handles missing shutdown methods

Example: Complete Drive Loop

Here’s a simplified example showing the entire lifecycle:
import donkeycar as dk
from donkeycar.parts.camera import PiCamera
from donkeycar.parts.actuator import PCA9685, PWMSteering, PWMThrottle

# Create vehicle
V = dk.vehicle.Vehicle()

# Add camera (threaded)
cam = PiCamera(image_w=160, image_h=120)
V.add(cam, outputs=['cam/image_array'], threaded=True)

# Add controller
from donkeycar.parts.controller import LocalWebController
ctr = LocalWebController()
V.add(ctr,
      inputs=['cam/image_array'],
      outputs=['user/angle', 'user/throttle', 'user/mode'],
      threaded=True)

# Add pilot condition
class PilotCondition:
    def run(self, mode):
        return mode != 'user'
        
V.add(PilotCondition(), 
      inputs=['user/mode'], 
      outputs=['run_pilot'])

# Add model (conditional)
model = load_model('my_model.h5')
V.add(model,
      inputs=['cam/image_array'],
      outputs=['pilot/angle', 'pilot/throttle'],
      run_condition='run_pilot')

# Choose mode
class DriveMode:
    def run(self, mode, user_angle, user_throttle, pilot_angle, pilot_throttle):
        if mode == 'user':
            return user_angle, user_throttle
        else:
            return pilot_angle, pilot_throttle

V.add(DriveMode(),
      inputs=['user/mode', 'user/angle', 'user/throttle', 
              'pilot/angle', 'pilot/throttle'],
      outputs=['angle', 'throttle'])

# Add actuators
steering = PWMSteering(...)
throttle = PWMThrottle(...)
V.add(steering, inputs=['angle'])
V.add(throttle, inputs=['throttle'])

# Start the loop at 20 Hz
V.start(rate_hz=20, verbose=True)

What Happens:

  1. Initialization: Vehicle creates Memory, registers 8 parts
  2. Startup: Starts camera and controller threads
  3. Loop (every 50ms at 20Hz):
    • Camera updates in background, run_threaded() returns latest frame → cam/image_array
    • Controller polls for input → user/angle, user/throttle, user/mode
    • PilotCondition checks mode → run_pilot
    • Model runs (if run_pilot=True) → pilot/angle, pilot/throttle
    • DriveMode selects user or pilot outputs → angle, throttle
    • Steering and throttle actuators execute
  4. Sleep: Calculate remaining time to hit 50ms, sleep
  5. Repeat until interrupted
  6. Shutdown: Call shutdown on all parts, print profiling report

Timing Considerations

Target Rate

Common rate_hz values:
  • 20 Hz (50ms): Standard for most cars
  • 10 Hz (100ms): For slower hardware or heavy processing
  • 30 Hz (33ms): For faster response times

Jitter Violations

If processing takes longer than 1/rate_hz:
WARN::Vehicle: jitter violation in vehicle loop with 12ms
This means the loop overran by 12ms. Causes:
  • Blocking parts taking too long
  • Camera capture delays
  • Model inference too slow
  • Too many synchronous parts

Solutions

  1. Make parts threaded: Move slow I/O to background
  2. Optimize processing: Reduce image resolution, use faster models
  3. Lower rate_hz: If 20Hz is too fast, try 15Hz or 10Hz
  4. Profile: Use verbose mode to identify bottlenecks

Best Practices

  1. Thread I/O operations: Cameras, network, sensors should be threaded
  2. Keep processing light: Main loop should be fast (< 50ms total)
  3. Order parts correctly: Later parts can use earlier outputs in same iteration
  4. Use run conditions: Avoid unnecessary work with conditional execution
  5. Profile regularly: Check execution times to catch performance regressions
  6. Handle shutdown: Implement cleanup in shutdown() method

Next Steps

Build docs developers (and LLMs) love