The Vehicle class is the execution engine of Donkeycar. It runs a continuous loop that orchestrates all parts, manages data flow through Memory, and maintains consistent timing.
The Vehicle Class
Defined in donkeycar/vehicle.py:61, the Vehicle class is simple but powerful:
class Vehicle:
def __init__(self, mem=None):
if not mem:
mem = Memory()
self.mem = mem # Shared memory for data
self.parts = [] # List of parts to execute
self.on = True # Loop control flag
self.threads = [] # Background thread handles
self.profiler = PartProfiler() # Performance tracking
Adding Parts
Parts are registered with the add() method:
def add(self, part, inputs=[], outputs=[], threaded=False, run_condition=None):
"""
Method to add a part to the vehicle drive loop.
Parameters
----------
part : class
Donkey vehicle part has run() attribute
inputs : list
Channel names to get from memory.
outputs : list
Channel names to save to memory.
threaded : boolean
If a part should be run in a separate thread.
run_condition : str
If a part should be run or not
"""
Example from the source code (complete.py:105):
V.add(cam, inputs=inputs, outputs=['cam/image_array'], threaded=True)
Parts are executed in the order they are added. This execution order is important because later parts can use outputs from earlier parts in the same iteration.
The Drive Loop
The main loop is started with the start() method:
def start(self, rate_hz=10, max_loop_count=None, verbose=False):
"""
Start vehicle's main drive loop.
This is the main thread of the vehicle. It starts all the new
threads for the threaded parts then starts an infinite loop
that runs each part and updates the memory.
Parameters
----------
rate_hz : int
The max frequency that the drive loop should run. The actual
frequency may be less than this if there are many blocking parts.
max_loop_count : int
Maximum number of loops the drive loop should execute. This is
used for testing that all the parts of the vehicle work.
verbose: bool
If debug output should be printed into shell
"""
Loop Initialization
From vehicle.py:137-147:
self.on = True
for entry in self.parts:
if entry.get('thread'):
# start the update thread
entry.get('thread').start()
# wait until the parts warm up.
logger.info('Starting vehicle at {} Hz'.format(rate_hz))
loop_start_time = time.time()
loop_count = 0
Main Loop Execution
The core loop structure (vehicle.py:151-172):
while self.on:
start_time = time.time()
loop_count += 1
self.update_parts() # Execute all parts
# stop drive loop if loop_count exceeds max_loopcount
if max_loop_count and loop_count >= max_loop_count:
self.on = False
else:
# Calculate sleep time to maintain rate_hz
sleep_time = 1.0 / rate_hz - (time.time() - start_time)
if sleep_time > 0.0:
time.sleep(sleep_time)
else:
# print a message when could not maintain loop rate.
if verbose:
logger.info('WARN::Vehicle: jitter violation in vehicle loop '
'with {0:4.0f}ms'.format(abs(1000 * sleep_time)))
if verbose and loop_count % 200 == 0:
self.profiler.report()
The loop automatically sleeps between iterations to maintain the target rate_hz. If processing takes too long, you’ll see “jitter violation” warnings when running in verbose mode.
Update Parts Cycle
The update_parts() method executes all registered parts (vehicle.py:186-215):
def update_parts(self):
'''
loop over all parts
'''
for entry in self.parts:
run = True
# check run condition, if it exists
if entry.get('run_condition'):
run_condition = entry.get('run_condition')
run = self.mem.get([run_condition])[0]
if run:
# get part
p = entry['part']
# start timing part run
self.profiler.on_part_start(p)
# get inputs from memory
inputs = self.mem.get(entry['inputs'])
# run the part
if entry.get('thread'):
outputs = p.run_threaded(*inputs)
else:
outputs = p.run(*inputs)
# save the output to memory
if outputs is not None:
self.mem.put(entry['outputs'], outputs)
# finish timing part run
self.profiler.on_part_finished(p)
Execution Steps
For each part, the vehicle:
- Check run condition - If a
run_condition is specified, check if it’s True in Memory
- Get inputs - Retrieve all input values from Memory using
mem.get()
- Execute part - Call either
run() or run_threaded() with the inputs
- Store outputs - Save return values to Memory using
mem.put()
- Profile - Track execution time for performance analysis
Rate Control
The rate_hz parameter controls loop frequency:
V.start(rate_hz=20) # Run at 20 Hz (50ms per iteration)
How It Works
sleep_time = 1.0 / rate_hz - (time.time() - start_time)
if sleep_time > 0.0:
time.sleep(sleep_time)
- Calculates how long the current iteration took
- Sleeps for the remaining time to hit target rate
- If processing takes longer than
1/rate_hz, no sleep (jitter violation)
A typical Donkeycar runs at 20 Hz, giving each iteration 50ms. If your parts take longer than this, consider:
- Making slow parts threaded
- Reducing camera resolution
- Optimizing model inference
- Lowering the rate_hz
Threading Model
Synchronous Parts
Default behavior - part runs in main loop:
V.add(part, inputs=['input'], outputs=['output'], threaded=False)
The Vehicle calls:
outputs = part.run(*inputs)
Threaded Parts
For I/O-bound operations:
V.add(camera, inputs=[], outputs=['cam/image_array'], threaded=True)
Thread Creation
From vehicle.py:102-105:
if threaded:
t = Thread(target=part.update, args=())
t.daemon = True
entry['thread'] = t
Thread Execution
The part must implement update() which runs continuously:
class Camera:
def update(self):
# keep looping infinitely until the thread is stopped
while self.on:
self.run() # Capture new frame
def run_threaded(self):
return self.frame # Return latest frame
update() runs in background thread, continuously capturing frames
run_threaded() is called by main loop to get latest result
Example from camera.py:71-74:
def update(self):
# keep looping infinitely until the thread is stopped
while self.on:
self.run()
Threaded parts should store their latest result in an instance variable (like self.frame). The main loop just reads this cached value, so slow I/O doesn’t block the drive loop.
Run Conditions
Parts can execute conditionally based on Memory values:
V.add(autopilot_model,
inputs=['cam/image_array'],
outputs=['pilot/angle', 'pilot/throttle'],
run_condition='run_pilot')
The part only executes when Memory['run_pilot'] == True.
Example from complete.py
# This part sets run_pilot based on user mode
class PilotCondition:
def run(self, mode):
return mode != 'user'
V.add(PilotCondition(),
inputs=['user/mode'],
outputs=['run_pilot'])
# Model only runs when run_pilot is True
V.add(kl,
inputs=['cam/image_array'],
outputs=['pilot/angle', 'pilot/throttle'],
run_condition='run_pilot')
This pattern enables:
- User vs autopilot mode switching
- Conditional recording
- Button-triggered actions
- Feature toggles
The PartProfiler tracks execution time for each part:
class PartProfiler:
def profile_part(self, p):
self.records[p] = { "times" : [] }
def on_part_start(self, p):
self.records[p]['times'].append(time.time())
def on_part_finished(self, p):
now = time.time()
prev = self.records[p]['times'][-1]
delta = now - prev
self.records[p]['times'][-1] = delta
Profiler Report
When the vehicle stops, it prints a summary:
Part Profile Summary: (times in ms)
+-----------------+------+------+------+-------+-------+-------+--------+
| part | max | min | avg | 50% | 90% | 99% | 99.9% |
+-----------------+------+------+------+-------+-------+-------+--------+
| PiCamera | 45.2 | 32.1 | 35.4 | 34.5 | 38.2 | 42.1 | 44.8 |
| KerasPilot | 68.3 | 45.6 | 52.1 | 51.2 | 58.9 | 65.4 | 67.9 |
| PWMSteering | 2.3 | 0.8 | 1.2 | 1.1 | 1.5 | 2.0 | 2.2 |
+-----------------+------+------+------+-------+-------+-------+--------+
From vehicle.py:39-58:
def report(self):
logger.info("Part Profile Summary: (times in ms)")
pt = PrettyTable()
field_names = ["part", "max", "min", "avg"]
pctile = [50, 90, 99, 99.9]
pt.field_names = field_names + [str(p) + '%' for p in pctile]
for p, val in self.records.items():
arr = val['times'][1:-1]
if len(arr) == 0:
continue
row = [p.__class__.__name__,
"%.2f" % (max(arr) * 1000),
"%.2f" % (min(arr) * 1000),
"%.2f" % (sum(arr) / len(arr) * 1000)]
row += ["%.2f" % (np.percentile(arr, p) * 1000) for p in pctile]
pt.add_row(row)
logger.info('\n' + str(pt))
Run with verbose=True to see profiling data every 200 iterations during execution, helping you identify bottlenecks in real-time.
Shutdown Sequence
When the vehicle stops (vehicle.py:217-228):
def stop(self):
logger.info('Shutting down vehicle and its parts...')
for entry in self.parts:
try:
entry['part'].shutdown()
except AttributeError:
# usually from missing shutdown method, which should be optional
pass
except Exception as e:
logger.error(e)
self.profiler.report()
- Calls
shutdown() on all parts (if they have it)
- Prints final profiling report
- Gracefully handles missing shutdown methods
Example: Complete Drive Loop
Here’s a simplified example showing the entire lifecycle:
import donkeycar as dk
from donkeycar.parts.camera import PiCamera
from donkeycar.parts.actuator import PCA9685, PWMSteering, PWMThrottle
# Create vehicle
V = dk.vehicle.Vehicle()
# Add camera (threaded)
cam = PiCamera(image_w=160, image_h=120)
V.add(cam, outputs=['cam/image_array'], threaded=True)
# Add controller
from donkeycar.parts.controller import LocalWebController
ctr = LocalWebController()
V.add(ctr,
inputs=['cam/image_array'],
outputs=['user/angle', 'user/throttle', 'user/mode'],
threaded=True)
# Add pilot condition
class PilotCondition:
def run(self, mode):
return mode != 'user'
V.add(PilotCondition(),
inputs=['user/mode'],
outputs=['run_pilot'])
# Add model (conditional)
model = load_model('my_model.h5')
V.add(model,
inputs=['cam/image_array'],
outputs=['pilot/angle', 'pilot/throttle'],
run_condition='run_pilot')
# Choose mode
class DriveMode:
def run(self, mode, user_angle, user_throttle, pilot_angle, pilot_throttle):
if mode == 'user':
return user_angle, user_throttle
else:
return pilot_angle, pilot_throttle
V.add(DriveMode(),
inputs=['user/mode', 'user/angle', 'user/throttle',
'pilot/angle', 'pilot/throttle'],
outputs=['angle', 'throttle'])
# Add actuators
steering = PWMSteering(...)
throttle = PWMThrottle(...)
V.add(steering, inputs=['angle'])
V.add(throttle, inputs=['throttle'])
# Start the loop at 20 Hz
V.start(rate_hz=20, verbose=True)
What Happens:
- Initialization: Vehicle creates Memory, registers 8 parts
- Startup: Starts camera and controller threads
- Loop (every 50ms at 20Hz):
- Camera updates in background,
run_threaded() returns latest frame → cam/image_array
- Controller polls for input →
user/angle, user/throttle, user/mode
- PilotCondition checks mode →
run_pilot
- Model runs (if
run_pilot=True) → pilot/angle, pilot/throttle
- DriveMode selects user or pilot outputs →
angle, throttle
- Steering and throttle actuators execute
- Sleep: Calculate remaining time to hit 50ms, sleep
- Repeat until interrupted
- Shutdown: Call shutdown on all parts, print profiling report
Timing Considerations
Target Rate
Common rate_hz values:
- 20 Hz (50ms): Standard for most cars
- 10 Hz (100ms): For slower hardware or heavy processing
- 30 Hz (33ms): For faster response times
Jitter Violations
If processing takes longer than 1/rate_hz:
WARN::Vehicle: jitter violation in vehicle loop with 12ms
This means the loop overran by 12ms. Causes:
- Blocking parts taking too long
- Camera capture delays
- Model inference too slow
- Too many synchronous parts
Solutions
- Make parts threaded: Move slow I/O to background
- Optimize processing: Reduce image resolution, use faster models
- Lower rate_hz: If 20Hz is too fast, try 15Hz or 10Hz
- Profile: Use verbose mode to identify bottlenecks
Best Practices
- Thread I/O operations: Cameras, network, sensors should be threaded
- Keep processing light: Main loop should be fast (< 50ms total)
- Order parts correctly: Later parts can use earlier outputs in same iteration
- Use run conditions: Avoid unnecessary work with conditional execution
- Profile regularly: Check execution times to catch performance regressions
- Handle shutdown: Implement cleanup in
shutdown() method
Next Steps