Skip to main content
Basis uses a publish-subscribe (pub-sub) model for all inter-component communication. Publishers write messages to named topics; subscribers receive them. The framework automatically connects publishers and subscribers — within a process via shared memory and across processes via TCP — with no manual wiring required.

Topics and type safety

Topics are strings that identify a data channel, for example /time_test or /camera/rgb. Both publishers and subscribers declare the C++ type of the message they expect. Basis enforces type compatibility at subscription time using serialization metadata registered with the coordinator. Message types are not limited to a particular serialization format. Basis supports protobuf, ROS1 messages, and raw byte buffers through its serialization plugin system, all on the same topic namespace.

Publishers

A publisher is created by calling Advertise<T>() on your Unit (or directly on the TransportManager). The return value is a std::shared_ptr<Publisher<T_MSG>> that you keep alive for the duration of the publisher’s lifetime.
// From basis/unit.h — the Unit helper
template <typename T_MSG, typename T_Serializer = SerializationHandler<T_MSG>::type>
[[nodiscard]] std::shared_ptr<core::transport::Publisher<T_MSG>>
Advertise(std::string_view topic, core::serialization::MessageTypeInfo message_type = ...);
Calling Publish() on the returned publisher sends the message to all current subscribers:
// Advertise once in Initialize()
time_test_pub = Advertise<TimeTest>("/time_test");

// Publish whenever you have data
auto msg = std::make_shared<TimeTest>();
msg->set_time(time.ToSeconds());
time_test_pub->Publish(msg);

How Publish() works

Publisher<T_MSG>::Publish() performs two separate operations:
  1. Inproc delivery — if there are any inproc subscribers, the std::shared_ptr<const T_MSG> is handed directly to each subscriber’s callback. No copy, no serialization.
  2. Network delivery — if there are any TCP (or other transport) subscribers, the message is serialized into a MessagePacket using the configured serializer, then sent to each transport publisher.
Serialization only happens when there is at least one network subscriber. If all subscribers are within the same process, the message is never serialized.
// Simplified from publisher.h
void Publish(std::shared_ptr<const T_MSG> msg) {
  // Zero-copy inproc delivery
  if (inproc) {
    inproc->Publish(msg);
  }

  // Only serialize if network subscribers exist
  if (!GetTransportSubscriberCount()) {
    return;
  }

  const size_t payload_size = get_message_size_cb(*msg);
  auto packet = std::make_shared<MessagePacket>(
      MessageHeader::DataType::MESSAGE, payload_size);
  write_message_to_span_cb(*msg, packet->GetMutablePayload());
  PublishRaw(std::move(packet), now);
}

Queue depth

Each TransportPublisher has a configurable maximum queue size. When the queue is full, older messages are dropped. Set it via Publisher::SetMaxQueueSize():
time_test_pub->SetMaxQueueSize(10);

Subscribers

A subscriber is created by calling Subscribe<T>(). The return value is a std::shared_ptr<Subscriber<T_MSG>>. You must keep this shared pointer alive — dropping it unsubscribes.
// From basis/unit.h — the SingleThreadedUnit helper
template <typename T_MSG, typename T_Serializer = SerializationHandler<T_MSG>::type>
[[nodiscard]] std::shared_ptr<core::transport::Subscriber<T_MSG>>
Subscribe(std::string_view topic,
          core::transport::SubscriberCallback<T_MSG> callback,
          size_t queue_depth = 0,
          core::serialization::MessageTypeInfo message_type = ...);
The callback signature is:
// SubscriberCallback<T_MSG> is an alias for:
std::function<void(std::shared_ptr<const T_MSG>)>

Example: subscribing with a member function

void ExampleUnit::Initialize(const basis::UnitInitializeOptions& options) {
  using namespace std::placeholders;

  time_test_sub = Subscribe<TimeTest>(
      "/time_test",
      std::bind(&ExampleUnit::OnTimeTest, this, _1));
}

void ExampleUnit::OnTimeTest(std::shared_ptr<const TimeTest> msg) {
  BASIS_LOG_INFO("Got message with time: {:.6f}s", msg->time());
  time_test_pub_forwarded->Publish(msg);
}

Example: subscribing with a lambda

vector_sub = Subscribe<ExampleStampedVector>(
    "/stamped_vector",
    [this](std::shared_ptr<const ExampleStampedVector> m) {
      // Process message inline
      vector_lidar_sync->OnMessage<0>(m);
    });

Inproc vs network transport

Basis maintains two separate delivery paths for each topic.
The inproc transport (InprocTransport) passes std::shared_ptr<const T_MSG> directly between publishers and subscribers in the same process. There is no serialization, no copy, and no memory allocation beyond incrementing a reference count.This is the default path for any publisher-subscriber pair that lives in the same process. It is always enabled when you call CreateTransportManager().
// InprocPublisher delivers directly to all registered InprocSubscribers
void InprocPublisher<T_MSG>::Publish(std::shared_ptr<const T_MSG> msg) {
  connector->Publish(this->topic, std::move(msg), ignore_if_primary_connector);
}
The InprocConnector<T_MSG> is keyed by topic name and message type. Each connector holds a list of weak pointers to its subscribers; expired pointers are pruned on the next publish.
The transport selection is automatic. If a subscriber is in the same process as the publisher, it receives messages via inproc. If it is in another process, it receives via TCP. You do not have to choose explicitly in most cases.

Controlling transports per topic

In the unit YAML, you can restrict which transports a given input may use with allow_transports and deny_transports:
inputs:
  /topic_a:
    type: protobuf:A
    # Only receive this topic via inproc (same-process only)
    allow_transports:
      - inproc

  /topic_b:
    type: protobuf:B
    # Never receive this topic via inproc
    deny_transports:
      - inproc
See Transport layers for a full explanation of the transport architecture.

RateSubscriber — time-based callbacks

RateSubscriber is not a topic subscriber. It is a timer that fires a callback at a fixed rate on a background thread. Use it to drive periodic behavior such as publishing at a regular frequency.
// From subscriber.h
class RateSubscriber {
public:
  RateSubscriber(const Duration& tick_length,
                 std::function<void(MonotonicTime)> callback);
  ~RateSubscriber();  // stops the thread

  void Start();
  void Stop();
};
The callback receives the current MonotonicTime at the moment it fires. The thread sleeps using MonotonicTime::SleepUntil() with a run token to detect simulated time jumps (used in deterministic replay).
// Fires at 1 Hz
one_hz_rate_subscriber = std::make_unique<basis::core::transport::RateSubscriber>(
    basis::core::Duration::FromSecondsNanoseconds(1, 0),
    std::bind(&ExampleUnit::EveryOneSecond, this, std::placeholders::_1));

// Fires at 10 Hz (100ms period)
ten_hz_rate_subscriber = std::make_unique<basis::core::transport::RateSubscriber>(
    basis::core::Duration::FromSecondsNanoseconds(0, std::nano::den / 10),
    std::bind(&ExampleUnit::EveryTenHertz, this, std::placeholders::_1));
RateSubscriber runs on its own thread. When using SingleThreadedUnit, the callback fires on that background thread and is not serialized through the main callback queue. If you need serialized execution, push work onto the unit’s queue manually, or use a rate synchronizer in your .unit.yaml instead.
In the unit YAML, you can declare a rate-triggered handler that is automatically serialized:
handlers:
  TenHertzWithTopic:
    sync:
      rate: 10hz
    inputs:
      /topic_a:
        type: protobuf:A
See Synchronizers for more on rate-based firing.

TransportManager — the coordination layer

TransportManager is the central object that manages all publishers and subscribers for a Unit. Unit::Advertise() and Unit::Subscribe() are thin wrappers that delegate to the transport manager.
// Typed advertise — creates InprocPublisher + transport publishers
template <typename T_MSG, typename T_Serializer = ..., typename T_CONVERTABLE_INPROC = NoAdditionalInproc>
std::shared_ptr<Publisher<T_MSG, T_CONVERTABLE_INPROC>>
TransportManager::Advertise(std::string_view topic, ...);

// Typed subscribe — creates InprocSubscriber + transport subscribers
template <typename T_MSG, typename T_Serializer = ..., typename T_ADDITIONAL_INPROC = NoAdditionalInproc>
std::shared_ptr<Subscriber<T_MSG, T_ADDITIONAL_INPROC>>
TransportManager::Subscribe(std::string_view topic,
                             SubscriberCallback<T_MSG> callback, ...);
TransportManager::Update() is called inside Unit::Update(). It polls each registered transport for new connections, prunes expired publishers, and collects updated PublisherInfo to send to the coordinator. When the coordinator sends a NetworkInfo message, TransportManager::HandleNetworkInfo() iterates the reported publishers and calls Subscriber::HandlePublisherInfo() on each matching subscriber, which triggers TCP connection establishment.

Complete example

The basis_example.cpp file shows the full pub-sub pattern:
class ExampleUnit : public basis::SingleThreadedUnit {
public:
  ExampleUnit() : basis::SingleThreadedUnit("ExampleUnit") {}

  virtual void Initialize(const basis::UnitInitializeOptions& options = {}) override {
    using namespace std::placeholders;

    // Subscribe to /time_test with a member function callback
    time_test_sub = Subscribe<TimeTest>(
        "/time_test", std::bind(&ExampleUnit::OnTimeTest, this, _1));

    // Advertise on /time_test — we receive our own publishes via inproc
    time_test_pub = Advertise<TimeTest>("/time_test");
    time_test_pub_forwarded = Advertise<TimeTest>("/time_test_forwarded");

    // Publish at 1 Hz from a background timer
    one_hz_rate_subscriber = std::make_unique<basis::core::transport::RateSubscriber>(
        basis::core::Duration::FromSecondsNanoseconds(1, 0),
        std::bind(&ExampleUnit::EveryOneSecond, this, _1));
  }

protected:
  void EveryOneSecond(basis::core::MonotonicTime time) {
    auto msg = std::make_shared<TimeTest>();
    msg->set_time(time.ToSeconds());
    time_test_pub->Publish(msg);
  }

  void OnTimeTest(std::shared_ptr<const TimeTest> msg) {
    // Forward the message unchanged — zero copy if using inproc
    time_test_pub_forwarded->Publish(msg);
  }

  std::shared_ptr<basis::core::transport::Subscriber<TimeTest>> time_test_sub;
  std::unique_ptr<basis::core::transport::RateSubscriber> one_hz_rate_subscriber;
  std::shared_ptr<basis::core::transport::Publisher<TimeTest>> time_test_pub;
  std::shared_ptr<basis::core::transport::Publisher<TimeTest>> time_test_pub_forwarded;
};

Next steps

Synchronizers

Control when a handler fires by combining messages across multiple topics

Transport layers

How inproc and TCP transports are structured and configured

Units

The Unit class hierarchy, lifecycle, and generated handler structs

Serialization

Protobuf, ROS messages, raw buffers, and custom serializers

Build docs developers (and LLMs) love