Skip to main content
The Memory class is the communication backbone of Donkeycar. It provides a shared key-value store that enables parts to exchange data without directly coupling to each other.

What is Memory?

Memory is a simple dictionary-based storage system where:
  • Parts write outputs to named channels (keys)
  • Other parts read from those same channels (inputs)
  • The Vehicle orchestrates reading and writing
  • All data persists for one loop iteration

The Memory Class

Defined in donkeycar/memory.py:9-61:
class Memory:
    """
    A convenience class to save key/value pairs.
    """
    def __init__(self, *args, **kw):
        self.d = {}
    
    def __setitem__(self, key, value):
        if type(key) is str:
            self.d[key] = value
        else:
            if type(key) is not tuple:
                key = tuple(key)
                value = tuple(key)
            for i, k in enumerate(key):
                self.d[k] = value[i]
    
    def __getitem__(self, key):
        if type(key) is tuple:
            return [self.d[k] for k in key]
        else:
            return self.d[key]
    
    def update(self, new_d):
        self.d.update(new_d)
    
    def put(self, keys, inputs):
        if len(keys) > 1:
            for i, key in enumerate(keys):
                try:
                    self.d[key] = inputs[i]
                except IndexError as e:
                    error = str(e) + ' issue with keys: ' + str(key)
                    raise IndexError(error)
        else:
            self.d[keys[0]] = inputs
    
    def get(self, keys):
        result = [self.d.get(k) for k in keys]
        return result
    
    def keys(self):
        return self.d.keys()
    
    def values(self):
        return self.d.values()
    
    def items(self):
        return self.d.items()
Memory is essentially a Python dictionary with helper methods for batch operations. The Vehicle class uses put() and get() to manage data flow.

How Data Flows

Writing to Memory

Parts return values that the Vehicle stores in Memory:
class Camera:
    def run(self):
        image = self.capture()
        return image  # Vehicle stores this

V.add(Camera(), 
      inputs=[],
      outputs=['cam/image_array'])  # Stored as Memory['cam/image_array']
The Vehicle calls Memory.put() (memory.py:35-45):
def put(self, keys, inputs):
    if len(keys) > 1:
        # Multiple outputs
        for i, key in enumerate(keys):
            self.d[key] = inputs[i]
    else:
        # Single output
        self.d[keys[0]] = inputs

Reading from Memory

The Vehicle retrieves inputs before calling a part:
class Model:
    def run(self, image):  # Vehicle passes Memory['cam/image_array']
        angle, throttle = self.predict(image)
        return angle, throttle

V.add(Model(),
      inputs=['cam/image_array'],  # Vehicle reads this from Memory
      outputs=['pilot/angle', 'pilot/throttle'])
The Vehicle calls Memory.get() (memory.py:49-51):
def get(self, keys):
    result = [self.d.get(k) for k in keys]
    return result  # Returns list of values

Vehicle Orchestration

From vehicle.py:186-215, the Vehicle manages the flow:
def update_parts(self):
    for entry in self.parts:
        if run:
            p = entry['part']
            
            # 1. Get inputs from Memory
            inputs = self.mem.get(entry['inputs'])
            
            # 2. Run the part
            if entry.get('thread'):
                outputs = p.run_threaded(*inputs)
            else:
                outputs = p.run(*inputs)
            
            # 3. Put outputs into Memory
            if outputs is not None:
                self.mem.put(entry['outputs'], outputs)

Channel Naming

Channels use a hierarchical naming convention:

Sensor Channels

'cam/image_array'       # Camera image
'cam/depth_array'       # Depth camera
'imu/acl_x'            # IMU accelerometer X
'imu/gyr_z'            # IMU gyroscope Z
'lidar/dist_array'     # LIDAR distances
'enc/speed'            # Encoder speed
'gps/position'         # GPS position

User Input Channels

'user/angle'           # User steering input
'user/throttle'        # User throttle input
'user/mode'            # Driving mode ('user', 'local', 'local_angle')
'user/steering'        # Alternative to user/angle
'web/buttons'          # Web UI button states

Autopilot Channels

'pilot/angle'          # Autopilot steering
'pilot/throttle'       # Autopilot throttle
'pilot/steering'       # Alternative to pilot/angle
'pilot/loc'            # Localization output

Control Channels

'steering'             # Final steering command
'throttle'             # Final throttle command
'brake'                # Brake command (simulator)
'angle'                # Alternative to steering

State Channels

'run_pilot'            # Boolean: autopilot active?
'run_user'             # Boolean: manual mode active?
'recording'            # Boolean: recording data?
'ai_running'           # Boolean: AI model active?

Recording Channels

'tub/num_records'      # Number of recorded frames
'records/alert'        # Recording alert state
Use the pattern category/name for channel names. This makes it easy to understand where data comes from and prevents naming conflicts.

Data Flow Example

A complete data flow through Memory:
import donkeycar as dk

V = dk.vehicle.Vehicle()

# Part 1: Camera produces image
class Camera:
    def run(self):
        return self.capture()

V.add(Camera(), 
      inputs=[], 
      outputs=['cam/image_array'])
# Memory: {'cam/image_array': <image>}

# Part 2: Controller produces user inputs
class Controller:
    def run(self, image):  # Gets Memory['cam/image_array']
        return 0.5, 0.3, 'user'  # angle, throttle, mode

V.add(Controller(),
      inputs=['cam/image_array'],
      outputs=['user/angle', 'user/throttle', 'user/mode'])
# Memory: {'cam/image_array': <image>,
#          'user/angle': 0.5,
#          'user/throttle': 0.3,
#          'user/mode': 'user'}

# Part 3: Pilot condition checks mode
class PilotCondition:
    def run(self, mode):  # Gets Memory['user/mode']
        return mode != 'user'

V.add(PilotCondition(),
      inputs=['user/mode'],
      outputs=['run_pilot'])
# Memory: {..., 'run_pilot': False}

# Part 4: Model runs conditionally
class Model:
    def run(self, image):  # Only called if run_pilot==True
        return 0.3, 0.4

V.add(Model(),
      inputs=['cam/image_array'],
      outputs=['pilot/angle', 'pilot/throttle'],
      run_condition='run_pilot')  # Skipped because run_pilot==False

# Part 5: DriveMode selects inputs
class DriveMode:
    def run(self, mode, user_angle, user_throttle, pilot_angle, pilot_throttle):
        if mode == 'user':
            return user_angle, user_throttle
        else:
            return pilot_angle, pilot_throttle

V.add(DriveMode(),
      inputs=['user/mode', 'user/angle', 'user/throttle',
              'pilot/angle', 'pilot/throttle'],
      outputs=['angle', 'throttle'])
# Memory: {..., 'angle': 0.5, 'throttle': 0.3}

# Part 6: Actuators read final commands
class Steering:
    def run(self, angle):  # Gets Memory['angle']
        self.set_pwm(angle)

V.add(Steering(), inputs=['angle'], outputs=[])

class Throttle:
    def run(self, throttle):  # Gets Memory['throttle']
        self.set_pwm(throttle)

V.add(Throttle(), inputs=['throttle'], outputs=[])

Memory State After One Iteration

{
    'cam/image_array': <numpy array>,
    'user/angle': 0.5,
    'user/throttle': 0.3,
    'user/mode': 'user',
    'run_pilot': False,
    'pilot/angle': None,      # Not set because run_pilot==False
    'pilot/throttle': None,
    'angle': 0.5,
    'throttle': 0.3
}
Memory values persist across iterations. If a part doesn’t run (e.g., due to run_condition), its outputs from previous iterations remain in Memory.

Multiple Outputs

Parts can return multiple values:
class MultiOutput:
    def run(self, x):
        a = x * 2
        b = x + 1
        c = x ** 2
        return a, b, c  # Return tuple

V.add(MultiOutput(),
      inputs=['x'],
      outputs=['y', 'z', 'w'])  # Unpacks tuple

# Memory: {'y': a, 'z': b, 'w': c}
From memory.py:35-42:
def put(self, keys, inputs):
    if len(keys) > 1:
        for i, key in enumerate(keys):
            try:
                self.d[key] = inputs[i]  # Unpack tuple
            except IndexError as e:
                error = str(e) + ' issue with keys: ' + str(key)
                raise IndexError(error)

No Outputs

Parts that perform actions but don’t return data:
class MotorControl:
    def run(self, steering, throttle):
        self.steering_servo.set(steering)
        self.throttle_esc.set(throttle)
        # No return value

V.add(MotorControl(),
      inputs=['steering', 'throttle'],
      outputs=[])  # Empty list
From vehicle.py:212-213:
if outputs is not None:
    self.mem.put(entry['outputs'], outputs)
If outputs is None, nothing is stored.

Accessing Memory Directly

While parts normally don’t access Memory directly, sometimes it’s useful:
V = dk.vehicle.Vehicle()

# Direct access to Memory object
print(V.mem.keys())      # All channel names
print(V.mem['cam/image_array'])  # Get value
V.mem['custom/value'] = 42       # Set value

Use Cases

  1. Debugging: Inspect Memory state
  2. Initialization: Set initial values
  3. Special parts: Parts that modify Memory directly
Example from complete.py:143:
V.add(ExplodeDict(V.mem, "web/"), inputs=['web/buttons'])
This part takes web/buttons (a dict) and expands it into separate Memory keys.

Memory Lifecycle

1. Initialization

V = dk.vehicle.Vehicle()  # Creates empty Memory
# V.mem.d = {}

2. First Iteration

All inputs are None because Memory is empty:
class Part:
    def run(self, input_value):
        if input_value is None:
            return default_value  # Handle None gracefully
        return process(input_value)

3. Subsequent Iterations

Memory contains values from previous parts:
# Iteration 1:
Camera outputs → Memory['cam/image_array'] = <image1>

# Iteration 2:
Camera outputs → Memory['cam/image_array'] = <image2>  # Overwrites
Model reads Memory['cam/image_array']  # Gets <image2>

4. Run Conditions

If a part doesn’t run, its outputs aren’t updated:
# Iteration 1: run_pilot = True
Model runs → Memory['pilot/angle'] = 0.5

# Iteration 2: run_pilot = False
Model skipped → Memory['pilot/angle'] = 0.5  # Still 0.5 from iteration 1
Always check for None values in your parts, especially on the first loop iteration or when using run conditions.

Common Patterns

Pattern 1: Pipeline

Data flows sequentially through parts:
V.add(Part1(), outputs=['a'])
V.add(Part2(), inputs=['a'], outputs=['b'])
V.add(Part3(), inputs=['b'], outputs=['c'])
V.add(Part4(), inputs=['c'])
# a → b → c

Pattern 2: Fan-Out

One part’s output feeds multiple parts:
V.add(Camera(), outputs=['cam/image_array'])
V.add(Model(), inputs=['cam/image_array'], outputs=['pilot/angle'])
V.add(ObjectDetector(), inputs=['cam/image_array'], outputs=['detection'])
V.add(Recorder(), inputs=['cam/image_array'])
# cam/image_array → {Model, ObjectDetector, Recorder}

Pattern 3: Merge

Multiple inputs combined:
V.add(Camera(), outputs=['cam/image_array'])
V.add(IMU(), outputs=['imu/acl_x', 'imu/acl_y'])
V.add(Model(), 
      inputs=['cam/image_array', 'imu/acl_x', 'imu/acl_y'],
      outputs=['pilot/angle'])
# {cam/image_array, imu/acl_x, imu/acl_y} → Model

Pattern 4: Conditional Branch

V.add(Condition(), inputs=['mode'], outputs=['run_a', 'run_b'])
V.add(PartA(), outputs=['result'], run_condition='run_a')
V.add(PartB(), outputs=['result'], run_condition='run_b')
# Both write to 'result', but only one runs

Memory and Threading

Threaded parts access Memory through the main loop:
class ThreadedCamera:
    def __init__(self):
        self.frame = None
    
    def update(self):
        """Background thread"""
        while self.on:
            self.frame = self.capture()  # Updates self.frame
    
    def run_threaded(self):
        """Main loop calls this"""
        return self.frame  # Vehicle puts this in Memory

V.add(ThreadedCamera(), 
      outputs=['cam/image_array'],
      threaded=True)
Flow:
  1. Background thread updates self.frame continuously
  2. Main loop calls run_threaded() to get latest frame
  3. Vehicle stores return value in Memory
  4. Other parts read from Memory
Threaded parts should store data in instance variables, not in Memory directly. The Vehicle handles transferring data to Memory.

Debugging Memory

To see what’s in Memory:
class MemoryDebugger:
    def run(self):
        # This part has no inputs, so won't read anything
        return None

V.add(MemoryDebugger(), outputs=[])

# Then access after starting:
print("Memory contents:")
for key, value in V.mem.items():
    print(f"  {key}: {value}")
Or create a debug part:
class PrintMemory:
    def __init__(self, channels):
        self.channels = channels
    
    def run(self, *values):
        for channel, value in zip(self.channels, values):
            print(f"{channel}: {value}")

V.add(PrintMemory(['cam/image_array', 'user/angle', 'pilot/angle']),
      inputs=['cam/image_array', 'user/angle', 'pilot/angle'])

Best Practices

  1. Use descriptive channel names: cam/image_array not img
  2. Follow naming conventions: category/name pattern
  3. Handle None values: Check inputs on first iteration
  4. Don’t access Memory directly in parts: Use inputs/outputs
  5. Keep Memory clean: Don’t store unnecessary data
  6. Document channels: Comment what each channel contains
  7. Avoid name collisions: Use unique, specific names

Next Steps

Build docs developers (and LLMs) love