Basis uses a typed publish-subscribe model where publishers and subscribers are connected by topic name and message type. Inproc transport passes shared pointers with zero copy; network transport serializes on demand.
Basis uses a publish-subscribe (pub-sub) model for all inter-component communication. Publishers write messages to named topics; subscribers receive them. The framework automatically connects publishers and subscribers — within a process via shared memory and across processes via TCP — with no manual wiring required.
Topics are strings that identify a data channel, for example /time_test or /camera/rgb. Both publishers and subscribers declare the C++ type of the message they expect. Basis enforces type compatibility at subscription time using serialization metadata registered with the coordinator.Message types are not limited to a particular serialization format. Basis supports protobuf, ROS1 messages, and raw byte buffers through its serialization plugin system, all on the same topic namespace.
A publisher is created by calling Advertise<T>() on your Unit (or directly on the TransportManager). The return value is a std::shared_ptr<Publisher<T_MSG>> that you keep alive for the duration of the publisher’s lifetime.
// From basis/unit.h — the Unit helpertemplate <typename T_MSG, typename T_Serializer = SerializationHandler<T_MSG>::type>[[nodiscard]] std::shared_ptr<core::transport::Publisher<T_MSG>>Advertise(std::string_view topic, core::serialization::MessageTypeInfo message_type = ...);
Calling Publish() on the returned publisher sends the message to all current subscribers:
// Advertise once in Initialize()time_test_pub = Advertise<TimeTest>("/time_test");// Publish whenever you have dataauto msg = std::make_shared<TimeTest>();msg->set_time(time.ToSeconds());time_test_pub->Publish(msg);
Publisher<T_MSG>::Publish() performs two separate operations:
Inproc delivery — if there are any inproc subscribers, the std::shared_ptr<const T_MSG> is handed directly to each subscriber’s callback. No copy, no serialization.
Network delivery — if there are any TCP (or other transport) subscribers, the message is serialized into a MessagePacket using the configured serializer, then sent to each transport publisher.
Serialization only happens when there is at least one network subscriber. If all subscribers are within the same process, the message is never serialized.
// Simplified from publisher.hvoid Publish(std::shared_ptr<const T_MSG> msg) { // Zero-copy inproc delivery if (inproc) { inproc->Publish(msg); } // Only serialize if network subscribers exist if (!GetTransportSubscriberCount()) { return; } const size_t payload_size = get_message_size_cb(*msg); auto packet = std::make_shared<MessagePacket>( MessageHeader::DataType::MESSAGE, payload_size); write_message_to_span_cb(*msg, packet->GetMutablePayload()); PublishRaw(std::move(packet), now);}
Each TransportPublisher has a configurable maximum queue size. When the queue is full, older messages are dropped. Set it via Publisher::SetMaxQueueSize():
A subscriber is created by calling Subscribe<T>(). The return value is a std::shared_ptr<Subscriber<T_MSG>>. You must keep this shared pointer alive — dropping it unsubscribes.
Basis maintains two separate delivery paths for each topic.
Inproc (zero-copy)
TCP (network)
The inproc transport (InprocTransport) passes std::shared_ptr<const T_MSG> directly between publishers and subscribers in the same process. There is no serialization, no copy, and no memory allocation beyond incrementing a reference count.This is the default path for any publisher-subscriber pair that lives in the same process. It is always enabled when you call CreateTransportManager().
// InprocPublisher delivers directly to all registered InprocSubscribersvoid InprocPublisher<T_MSG>::Publish(std::shared_ptr<const T_MSG> msg) { connector->Publish(this->topic, std::move(msg), ignore_if_primary_connector);}
The InprocConnector<T_MSG> is keyed by topic name and message type. Each connector holds a list of weak pointers to its subscribers; expired pointers are pruned on the next publish.
The TCP transport (TcpTransport / TcpPublisher / TcpSubscriber) handles cross-process communication. When a Unit connects to the coordinator, it advertises its publisher endpoints. Other Units learn about those endpoints and establish TCP connections.TcpPublisher opens a listen socket on a random port and accepts one TcpSender connection per subscriber. Each TcpSender runs a dedicated send thread with a configurable outbound queue.TcpSubscriber connects to the publisher’s address and port as reported by the coordinator. It uses epoll to multiplex receive events across potentially many connections, dispatching to a worker thread pool for deserialization.Messages are serialized once and sent to all connected senders via TcpPublisher::SendMessage().
The transport selection is automatic. If a subscriber is in the same process as the publisher, it receives messages via inproc. If it is in another process, it receives via TCP. You do not have to choose explicitly in most cases.
In the unit YAML, you can restrict which transports a given input may use with allow_transports and deny_transports:
inputs: /topic_a: type: protobuf:A # Only receive this topic via inproc (same-process only) allow_transports: - inproc /topic_b: type: protobuf:B # Never receive this topic via inproc deny_transports: - inproc
See Transport layers for a full explanation of the transport architecture.
RateSubscriber is not a topic subscriber. It is a timer that fires a callback at a fixed rate on a background thread. Use it to drive periodic behavior such as publishing at a regular frequency.
// From subscriber.hclass RateSubscriber {public: RateSubscriber(const Duration& tick_length, std::function<void(MonotonicTime)> callback); ~RateSubscriber(); // stops the thread void Start(); void Stop();};
The callback receives the current MonotonicTime at the moment it fires. The thread sleeps using MonotonicTime::SleepUntil() with a run token to detect simulated time jumps (used in deterministic replay).
RateSubscriber runs on its own thread. When using SingleThreadedUnit, the callback fires on that background thread and is not serialized through the main callback queue. If you need serialized execution, push work onto the unit’s queue manually, or use a rate synchronizer in your .unit.yaml instead.
In the unit YAML, you can declare a rate-triggered handler that is automatically serialized:
TransportManager is the central object that manages all publishers and subscribers for a Unit. Unit::Advertise() and Unit::Subscribe() are thin wrappers that delegate to the transport manager.
TransportManager::Update() is called inside Unit::Update(). It polls each registered transport for new connections, prunes expired publishers, and collects updated PublisherInfo to send to the coordinator.When the coordinator sends a NetworkInfo message, TransportManager::HandleNetworkInfo() iterates the reported publishers and calls Subscriber::HandlePublisherInfo() on each matching subscriber, which triggers TCP connection establishment.