Skip to main content
A Unit is a C++ class that encapsulates a slice of robot behavior. You declare what topics it subscribes to, what conditions cause a handler to fire, and what topics it publishes to. Basis generates all the pub-sub wiring and calls your handler functions automatically.

The Unit class hierarchy

Basis provides two base classes you can derive from directly when writing Units by hand, or that the code generator derives from on your behalf.
basis::Unit
└── basis::SingleThreadedUnit

basis::Unit

Unit is the base class for all Basis units. It holds the transport manager, coordinator connector, and the registered handler map. It provides the core lifecycle methods:
class Unit {
public:
  // Called once at startup — subscribe, advertise, set up rate subscribers
  virtual void Initialize(const UnitInitializeOptions& options = {}) = 0;

  // Called in a loop — drains the transport layer and dispatches callbacks
  virtual void Update(std::atomic<bool>* stop_token = nullptr,
                      const basis::core::Duration& max_execution_duration = {});

  // Blocks until a connection to the coordinator is established
  basis::core::transport::CoordinatorConnector* WaitForCoordinatorConnection();

  // Creates and stores the standard transport manager (inproc + TCP by default)
  basis::core::transport::TransportManager* CreateTransportManager(
      basis::RecorderInterface* recorder = nullptr);

  // Typed helpers forwarded to the transport manager
  template <typename T_MSG, typename T_Serializer = ...>
  std::shared_ptr<core::transport::Publisher<T_MSG>>
  Advertise(std::string_view topic, ...);

  template <typename T_MSG, typename T_Serializer = ...>
  std::shared_ptr<core::transport::Subscriber<T_MSG>>
  Subscribe(std::string_view topic, SubscriberCallback<T_MSG> callback, ...);
};

basis::SingleThreadedUnit

SingleThreadedUnit extends Unit with a shared callback queue. All handler callbacks are serialized through a single SubscriberOverallQueue, which means your handler code does not need locks — only one handler runs at a time.
class SingleThreadedUnit : public Unit {
public:
  // Override of Update() that drains the overall queue
  virtual void Update(std::atomic<bool>* stop_token,
                      const basis::core::Duration& max_execution_duration) override;

  // Subscribe overload that automatically queues callbacks through overall_queue
  template <typename T_MSG, typename T_Serializer = ...>
  std::shared_ptr<core::transport::Subscriber<T_MSG>>
  Subscribe(std::string_view topic, SubscriberCallback<T_MSG> callback,
            size_t queue_depth = 0, ...);

protected:
  std::shared_ptr<basis::core::containers::SubscriberOverallQueue> overall_queue;
  basis::core::threading::ThreadPool thread_pool{4};  // deserializes inbound messages
};
SingleThreadedUnit is the only threading model currently available. A multi-threaded variant where handlers run in parallel is planned but not yet implemented.

Unit lifecycle

Every Unit executable follows the same four-step startup sequence:
1

Connect to the coordinator

The coordinator is a separate process that tracks which topics are published and by whom. WaitForCoordinatorConnection() blocks and retries every second until the coordinator is reachable.
example_unit.WaitForCoordinatorConnection();
If the coordinator is not running you will see:
[warn] No connection to the coordinator, waiting 1 second and trying again
2

Create the transport manager

CreateTransportManager() instantiates the inproc transport and any registered network transports (TCP by default when using CreateStandardTransportManager). Each Unit gets its own transport manager and therefore its own publisher ID space.
example_unit.CreateTransportManager();
Pass a RecorderInterface* to record all messages published by this unit to an MCAP file.
3

Initialize

Initialize() is where you create publishers, subscribers, and rate subscribers. It is called exactly once, after the transport manager exists.
example_unit.Initialize();
Inside Initialize, call Advertise<T>() and Subscribe<T>() to set up your topics:
void ExampleUnit::Initialize(const basis::UnitInitializeOptions& options) {
  time_test_sub = Subscribe<TimeTest>(
      "/time_test",
      std::bind(&ExampleUnit::OnTimeTest, this, std::placeholders::_1));

  time_test_pub = Advertise<TimeTest>("/time_test");
}
4

Run the update loop

Update() is called repeatedly. It drives the transport layer (connects to new publishers as reported by the coordinator) and, for SingleThreadedUnit, drains the callback queue.
while (true) {
  example_unit.Update(nullptr,
      basis::core::Duration::FromSecondsNanoseconds(1, 0));
}
The max_execution_duration argument caps how long Update() blocks waiting for an event. If you pass 0, it returns immediately after processing any pending events.

HandlerPubSub — the generated handler structure

When you use code generation (via .unit.yaml), Basis generates a HandlerPubSub subclass for each declared handler. You rarely interact with this class directly, but understanding it helps when reading generated code or debugging. HandlerPubSub is a base struct that wires together a synchronizer, a set of typed subscriber callbacks, and a publish step:
struct HandlerPubSub {
  // Map from topic name to the type-erased inbound callback
  std::map<std::string, TypeErasedCallback> type_erased_callbacks;

  // Output topic names declared in the unit YAML
  std::vector<std::string> outputs;

  // If the handler has a rate sync, this holds the period
  std::optional<basis::core::Duration> rate_duration;

  // Called by the RateSubscriber thread when the rate period elapses
  virtual void OnRateSubscriberTypeErased(
      basis::core::MonotonicTime now,
      HandlerExecutingCallback* callback) = 0;
};
The templated HandlerPubSubWithOptions<T_DERIVED, HAS_RATE, INPUT_COUNT> adds:
  • SetupInput<INDEX>() — creates a Subscriber<T_MSG> via the transport manager and registers the inbound callback.
  • SetupInputs() — iterates over all inputs using index sequences and calls SetupInput for each.
  • CreateOnMessageCallback<INDEX>() — builds the typed lambda that calls synchronizer->OnMessage<INDEX>() and, if consumed, calls RunHandlerAndPublish().
  • OnRateSubscriber() — called by the rate subscriber thread; calls synchronizer->ConsumeIfReady() and publishes if ready.
The generated Unit class registers each handler’s HandlerPubSub* in the handlers map, which is used by the DeterministicReplayer and UnitManager.

Writing a Unit by hand

You can subclass SingleThreadedUnit directly without code generation. The basis_example.cpp example demonstrates this pattern:
#include <basis/unit.h>
#include <basis_example.pb.h>

class ExampleUnit : public basis::SingleThreadedUnit {
public:
  ExampleUnit() : basis::SingleThreadedUnit("ExampleUnit") {}

  virtual void Initialize(const basis::UnitInitializeOptions& options = {}) override {
    using namespace std::placeholders;

    // Subscribe to incoming messages
    time_test_sub = Subscribe<TimeTest>(
        "/time_test",
        std::bind(&ExampleUnit::OnTimeTest, this, _1));

    // Advertise outgoing topics
    time_test_pub = Advertise<TimeTest>("/time_test");
    time_test_pub_forwarded = Advertise<TimeTest>("/time_test_forwarded");

    // Fire a callback at a fixed 1 Hz rate
    one_hz_rate_subscriber = std::make_unique<basis::core::transport::RateSubscriber>(
        basis::core::Duration::FromSecondsNanoseconds(1, 0),
        std::bind(&ExampleUnit::EveryOneSecond, this, _1));
  }

protected:
  void EveryOneSecond(const basis::core::MonotonicTime time) {
    auto msg = std::make_shared<TimeTest>();
    msg->set_time(time.ToSeconds());
    time_test_pub->Publish(msg);
  }

  void OnTimeTest(std::shared_ptr<const TimeTest> msg) {
    time_test_pub_forwarded->Publish(msg);
  }

  std::shared_ptr<basis::core::transport::Subscriber<TimeTest>> time_test_sub;
  std::unique_ptr<basis::core::transport::RateSubscriber> one_hz_rate_subscriber;
  std::shared_ptr<basis::core::transport::Publisher<TimeTest>> time_test_pub;
  std::shared_ptr<basis::core::transport::Publisher<TimeTest>> time_test_pub_forwarded;
};

int main(int argc, char* argv[]) {
  basis::core::logging::InitializeLoggingSystem();

  ExampleUnit example_unit;
  example_unit.WaitForCoordinatorConnection();
  example_unit.CreateTransportManager();
  example_unit.Initialize();

  while (true) {
    example_unit.Update(nullptr,
        basis::core::Duration::FromSecondsNanoseconds(1, 0));
  }
}

Units with code generation

For production use, declare your Unit in a .unit.yaml file and run the Basis code generator. The generator produces a typed Base class with all the HandlerPubSub subclasses wired up automatically. You implement only the handler functions.
example.unit.yaml
threading_model: single

handlers:
  OnPing:
    sync:
      type: all
    inputs:
      /ping:
        type: "protobuf: ::TimeTest"
    outputs:
      /pong:
        type: "protobuf: ::TimeTest"
The generated Base class handles Initialize() entirely. Your derived class overrides only the handler methods:
OnPing::Output MyUnit::OnPing(const OnPing::Input& input) {
  return OnPing::Output{ .pong = input.ping };
}
See Unit YAML schema and Code generation for the full reference.

Key properties

Each Unit is constructed with a string name passed to the Unit base constructor. The name is used as the logger name (via spdlog) and is surfaced in coordinator telemetry. Choose a name that is unique within a process.
ExampleUnit() : basis::SingleThreadedUnit("ExampleUnit") {}
Units declared in YAML support topic name templates such as {{args.camera_name}}/rgb. The templated_topic_to_runtime_topic map in Unit resolves templates to their runtime values before subscribers are created. This lets you parameterize topic names without recompiling.
Initialize() accepts a UnitInitializeOptions struct. Currently it has a single field:
struct UnitInitializeOptions {
  bool create_subscribers = true;
};
Set create_subscribers = false during deterministic replay to suppress subscriber creation and replay messages directly through the type-erased callback map instead.

Next steps

Pub-sub messaging

How publishers and subscribers work, including inproc zero-copy and network transport

Synchronizers

Control when handlers fire: all, equal, approximate, rate, or external

Unit YAML schema

Full reference for the .unit.yaml declaration format

Code generation

What generate_unit.py produces and how to use it in CMake

Build docs developers (and LLMs) love