The Memory class is the communication backbone of Donkeycar. It provides a shared key-value store that enables parts to exchange data without directly coupling to each other.
What is Memory?
Memory is a simple dictionary-based storage system where:
- Parts write outputs to named channels (keys)
- Other parts read from those same channels (inputs)
- The Vehicle orchestrates reading and writing
- All data persists for one loop iteration
The Memory Class
Defined in donkeycar/memory.py:9-61:
class Memory:
"""
A convenience class to save key/value pairs.
"""
def __init__(self, *args, **kw):
self.d = {}
def __setitem__(self, key, value):
if type(key) is str:
self.d[key] = value
else:
if type(key) is not tuple:
key = tuple(key)
value = tuple(key)
for i, k in enumerate(key):
self.d[k] = value[i]
def __getitem__(self, key):
if type(key) is tuple:
return [self.d[k] for k in key]
else:
return self.d[key]
def update(self, new_d):
self.d.update(new_d)
def put(self, keys, inputs):
if len(keys) > 1:
for i, key in enumerate(keys):
try:
self.d[key] = inputs[i]
except IndexError as e:
error = str(e) + ' issue with keys: ' + str(key)
raise IndexError(error)
else:
self.d[keys[0]] = inputs
def get(self, keys):
result = [self.d.get(k) for k in keys]
return result
def keys(self):
return self.d.keys()
def values(self):
return self.d.values()
def items(self):
return self.d.items()
Memory is essentially a Python dictionary with helper methods for batch operations. The Vehicle class uses put() and get() to manage data flow.
How Data Flows
Writing to Memory
Parts return values that the Vehicle stores in Memory:
class Camera:
def run(self):
image = self.capture()
return image # Vehicle stores this
V.add(Camera(),
inputs=[],
outputs=['cam/image_array']) # Stored as Memory['cam/image_array']
The Vehicle calls Memory.put() (memory.py:35-45):
def put(self, keys, inputs):
if len(keys) > 1:
# Multiple outputs
for i, key in enumerate(keys):
self.d[key] = inputs[i]
else:
# Single output
self.d[keys[0]] = inputs
Reading from Memory
The Vehicle retrieves inputs before calling a part:
class Model:
def run(self, image): # Vehicle passes Memory['cam/image_array']
angle, throttle = self.predict(image)
return angle, throttle
V.add(Model(),
inputs=['cam/image_array'], # Vehicle reads this from Memory
outputs=['pilot/angle', 'pilot/throttle'])
The Vehicle calls Memory.get() (memory.py:49-51):
def get(self, keys):
result = [self.d.get(k) for k in keys]
return result # Returns list of values
Vehicle Orchestration
From vehicle.py:186-215, the Vehicle manages the flow:
def update_parts(self):
for entry in self.parts:
if run:
p = entry['part']
# 1. Get inputs from Memory
inputs = self.mem.get(entry['inputs'])
# 2. Run the part
if entry.get('thread'):
outputs = p.run_threaded(*inputs)
else:
outputs = p.run(*inputs)
# 3. Put outputs into Memory
if outputs is not None:
self.mem.put(entry['outputs'], outputs)
Channel Naming
Channels use a hierarchical naming convention:
Sensor Channels
'cam/image_array' # Camera image
'cam/depth_array' # Depth camera
'imu/acl_x' # IMU accelerometer X
'imu/gyr_z' # IMU gyroscope Z
'lidar/dist_array' # LIDAR distances
'enc/speed' # Encoder speed
'gps/position' # GPS position
'user/angle' # User steering input
'user/throttle' # User throttle input
'user/mode' # Driving mode ('user', 'local', 'local_angle')
'user/steering' # Alternative to user/angle
'web/buttons' # Web UI button states
Autopilot Channels
'pilot/angle' # Autopilot steering
'pilot/throttle' # Autopilot throttle
'pilot/steering' # Alternative to pilot/angle
'pilot/loc' # Localization output
Control Channels
'steering' # Final steering command
'throttle' # Final throttle command
'brake' # Brake command (simulator)
'angle' # Alternative to steering
State Channels
'run_pilot' # Boolean: autopilot active?
'run_user' # Boolean: manual mode active?
'recording' # Boolean: recording data?
'ai_running' # Boolean: AI model active?
Recording Channels
'tub/num_records' # Number of recorded frames
'records/alert' # Recording alert state
Use the pattern category/name for channel names. This makes it easy to understand where data comes from and prevents naming conflicts.
Data Flow Example
A complete data flow through Memory:
import donkeycar as dk
V = dk.vehicle.Vehicle()
# Part 1: Camera produces image
class Camera:
def run(self):
return self.capture()
V.add(Camera(),
inputs=[],
outputs=['cam/image_array'])
# Memory: {'cam/image_array': <image>}
# Part 2: Controller produces user inputs
class Controller:
def run(self, image): # Gets Memory['cam/image_array']
return 0.5, 0.3, 'user' # angle, throttle, mode
V.add(Controller(),
inputs=['cam/image_array'],
outputs=['user/angle', 'user/throttle', 'user/mode'])
# Memory: {'cam/image_array': <image>,
# 'user/angle': 0.5,
# 'user/throttle': 0.3,
# 'user/mode': 'user'}
# Part 3: Pilot condition checks mode
class PilotCondition:
def run(self, mode): # Gets Memory['user/mode']
return mode != 'user'
V.add(PilotCondition(),
inputs=['user/mode'],
outputs=['run_pilot'])
# Memory: {..., 'run_pilot': False}
# Part 4: Model runs conditionally
class Model:
def run(self, image): # Only called if run_pilot==True
return 0.3, 0.4
V.add(Model(),
inputs=['cam/image_array'],
outputs=['pilot/angle', 'pilot/throttle'],
run_condition='run_pilot') # Skipped because run_pilot==False
# Part 5: DriveMode selects inputs
class DriveMode:
def run(self, mode, user_angle, user_throttle, pilot_angle, pilot_throttle):
if mode == 'user':
return user_angle, user_throttle
else:
return pilot_angle, pilot_throttle
V.add(DriveMode(),
inputs=['user/mode', 'user/angle', 'user/throttle',
'pilot/angle', 'pilot/throttle'],
outputs=['angle', 'throttle'])
# Memory: {..., 'angle': 0.5, 'throttle': 0.3}
# Part 6: Actuators read final commands
class Steering:
def run(self, angle): # Gets Memory['angle']
self.set_pwm(angle)
V.add(Steering(), inputs=['angle'], outputs=[])
class Throttle:
def run(self, throttle): # Gets Memory['throttle']
self.set_pwm(throttle)
V.add(Throttle(), inputs=['throttle'], outputs=[])
Memory State After One Iteration
{
'cam/image_array': <numpy array>,
'user/angle': 0.5,
'user/throttle': 0.3,
'user/mode': 'user',
'run_pilot': False,
'pilot/angle': None, # Not set because run_pilot==False
'pilot/throttle': None,
'angle': 0.5,
'throttle': 0.3
}
Memory values persist across iterations. If a part doesn’t run (e.g., due to run_condition), its outputs from previous iterations remain in Memory.
Multiple Outputs
Parts can return multiple values:
class MultiOutput:
def run(self, x):
a = x * 2
b = x + 1
c = x ** 2
return a, b, c # Return tuple
V.add(MultiOutput(),
inputs=['x'],
outputs=['y', 'z', 'w']) # Unpacks tuple
# Memory: {'y': a, 'z': b, 'w': c}
From memory.py:35-42:
def put(self, keys, inputs):
if len(keys) > 1:
for i, key in enumerate(keys):
try:
self.d[key] = inputs[i] # Unpack tuple
except IndexError as e:
error = str(e) + ' issue with keys: ' + str(key)
raise IndexError(error)
No Outputs
Parts that perform actions but don’t return data:
class MotorControl:
def run(self, steering, throttle):
self.steering_servo.set(steering)
self.throttle_esc.set(throttle)
# No return value
V.add(MotorControl(),
inputs=['steering', 'throttle'],
outputs=[]) # Empty list
From vehicle.py:212-213:
if outputs is not None:
self.mem.put(entry['outputs'], outputs)
If outputs is None, nothing is stored.
Accessing Memory Directly
While parts normally don’t access Memory directly, sometimes it’s useful:
V = dk.vehicle.Vehicle()
# Direct access to Memory object
print(V.mem.keys()) # All channel names
print(V.mem['cam/image_array']) # Get value
V.mem['custom/value'] = 42 # Set value
Use Cases
- Debugging: Inspect Memory state
- Initialization: Set initial values
- Special parts: Parts that modify Memory directly
Example from complete.py:143:
V.add(ExplodeDict(V.mem, "web/"), inputs=['web/buttons'])
This part takes web/buttons (a dict) and expands it into separate Memory keys.
Memory Lifecycle
1. Initialization
V = dk.vehicle.Vehicle() # Creates empty Memory
# V.mem.d = {}
2. First Iteration
All inputs are None because Memory is empty:
class Part:
def run(self, input_value):
if input_value is None:
return default_value # Handle None gracefully
return process(input_value)
3. Subsequent Iterations
Memory contains values from previous parts:
# Iteration 1:
Camera outputs → Memory['cam/image_array'] = <image1>
# Iteration 2:
Camera outputs → Memory['cam/image_array'] = <image2> # Overwrites
Model reads Memory['cam/image_array'] # Gets <image2>
4. Run Conditions
If a part doesn’t run, its outputs aren’t updated:
# Iteration 1: run_pilot = True
Model runs → Memory['pilot/angle'] = 0.5
# Iteration 2: run_pilot = False
Model skipped → Memory['pilot/angle'] = 0.5 # Still 0.5 from iteration 1
Always check for None values in your parts, especially on the first loop iteration or when using run conditions.
Common Patterns
Pattern 1: Pipeline
Data flows sequentially through parts:
V.add(Part1(), outputs=['a'])
V.add(Part2(), inputs=['a'], outputs=['b'])
V.add(Part3(), inputs=['b'], outputs=['c'])
V.add(Part4(), inputs=['c'])
# a → b → c
Pattern 2: Fan-Out
One part’s output feeds multiple parts:
V.add(Camera(), outputs=['cam/image_array'])
V.add(Model(), inputs=['cam/image_array'], outputs=['pilot/angle'])
V.add(ObjectDetector(), inputs=['cam/image_array'], outputs=['detection'])
V.add(Recorder(), inputs=['cam/image_array'])
# cam/image_array → {Model, ObjectDetector, Recorder}
Pattern 3: Merge
Multiple inputs combined:
V.add(Camera(), outputs=['cam/image_array'])
V.add(IMU(), outputs=['imu/acl_x', 'imu/acl_y'])
V.add(Model(),
inputs=['cam/image_array', 'imu/acl_x', 'imu/acl_y'],
outputs=['pilot/angle'])
# {cam/image_array, imu/acl_x, imu/acl_y} → Model
Pattern 4: Conditional Branch
V.add(Condition(), inputs=['mode'], outputs=['run_a', 'run_b'])
V.add(PartA(), outputs=['result'], run_condition='run_a')
V.add(PartB(), outputs=['result'], run_condition='run_b')
# Both write to 'result', but only one runs
Memory and Threading
Threaded parts access Memory through the main loop:
class ThreadedCamera:
def __init__(self):
self.frame = None
def update(self):
"""Background thread"""
while self.on:
self.frame = self.capture() # Updates self.frame
def run_threaded(self):
"""Main loop calls this"""
return self.frame # Vehicle puts this in Memory
V.add(ThreadedCamera(),
outputs=['cam/image_array'],
threaded=True)
Flow:
- Background thread updates
self.frame continuously
- Main loop calls
run_threaded() to get latest frame
- Vehicle stores return value in Memory
- Other parts read from Memory
Threaded parts should store data in instance variables, not in Memory directly. The Vehicle handles transferring data to Memory.
Debugging Memory
To see what’s in Memory:
class MemoryDebugger:
def run(self):
# This part has no inputs, so won't read anything
return None
V.add(MemoryDebugger(), outputs=[])
# Then access after starting:
print("Memory contents:")
for key, value in V.mem.items():
print(f" {key}: {value}")
Or create a debug part:
class PrintMemory:
def __init__(self, channels):
self.channels = channels
def run(self, *values):
for channel, value in zip(self.channels, values):
print(f"{channel}: {value}")
V.add(PrintMemory(['cam/image_array', 'user/angle', 'pilot/angle']),
inputs=['cam/image_array', 'user/angle', 'pilot/angle'])
Best Practices
- Use descriptive channel names:
cam/image_array not img
- Follow naming conventions:
category/name pattern
- Handle None values: Check inputs on first iteration
- Don’t access Memory directly in parts: Use inputs/outputs
- Keep Memory clean: Don’t store unnecessary data
- Document channels: Comment what each channel contains
- Avoid name collisions: Use unique, specific names
Next Steps