Skip to main content
Basis separates message routing logic from message delivery. The TransportManager is responsible for creating publishers and subscribers and for connecting them together. The actual byte-level delivery is handled by interchangeable transport plugins.

Architecture overview

Every Unit has one TransportManager. The transport manager maintains:
  • An InprocTransport instance — always present, handles same-process delivery.
  • A map of named Transport plugins — typically just TcpTransport, registered as "tcp".
When you call Advertise<T>(), the transport manager creates:
  • An InprocPublisher<T> (for same-process subscribers).
  • One TransportPublisher per registered transport plugin (for cross-process subscribers).
When you call Subscribe<T>(), the transport manager creates:
  • An InprocSubscriber<T> (to receive messages from same-process publishers).
  • One TransportSubscriber per registered transport plugin (to receive messages from cross-process publishers).
The coordinator acts as a rendezvous point. It receives PublisherInfo from each unit and distributes NetworkInfo to all subscribers. When a subscriber learns about a new publisher, it calls HandlePublisherInfo() on the appropriate TransportSubscriber, which establishes the connection.

TransportManager

// From transport_manager.h
class TransportManager {
public:
  // Create a typed publisher on all registered transports
  template <typename T_MSG, typename T_Serializer = ...,
            typename T_CONVERTABLE_INPROC = NoAdditionalInproc>
  std::shared_ptr<Publisher<T_MSG, T_CONVERTABLE_INPROC>>
  Advertise(std::string_view topic, ...);

  // Create a typed subscriber on all registered transports
  template <typename T_MSG, typename T_Serializer = ...,
            typename T_ADDITIONAL_INPROC = NoAdditionalInproc>
  std::shared_ptr<Subscriber<T_MSG, T_ADDITIONAL_INPROC>>
  Subscribe(std::string_view topic,
            SubscriberCallback<T_MSG> callback, ...);

  // Register a transport plugin by name
  void RegisterTransport(std::string_view transport_name,
                         std::unique_ptr<Transport> transport);

  // Poll transports and collect updated publisher info
  void Update();

  // Apply incoming coordinator network info to subscribers
  void HandleNetworkInfo(const proto::NetworkInfo& network_info);
};
TransportManager::Update() is called by Unit::Update() on every iteration. It polls each registered transport (for example, TcpTransport::Update() checks for new subscriber connections) and regenerates the publisher summary sent to the coordinator.

Inproc transport (zero-copy)

The inproc transport passes std::shared_ptr<const T_MSG> directly between publishers and subscribers in the same process. There is no serialization, no copy, no queue, and no system call.
Publisher<T>::Publish(msg)
  └── InprocPublisher<T>::Publish(msg)
        └── InprocConnector<T>::Publish(topic, msg, ...)
              └── InprocSubscriber<T>::OnMessage(msg)   // for each subscriber
                    └── callback(msg)                   // user callback, same shared_ptr

InprocConnector<T>

Each message type has a static InprocConnector<T> instance. It holds a map from topic name to a list of weak pointers to InprocSubscriber<T> instances. Expired weak pointers are cleaned up on the next publish.
// From inproc.h — simplified
template <typename T_MSG>
class InprocConnector : public InprocConnectorInterface<T_MSG> {
public:
  std::shared_ptr<InprocPublisher<T_MSG>>
  Advertise(std::string_view topic,
            InprocConnectorBase* ignore_if_primary_connector);

  std::shared_ptr<InprocSubscriber<T_MSG>>
  Subscribe(std::string_view topic,
            std::function<void(MessageEvent<T_MSG>)> callback,
            InprocConnectorBase* primary_inproc_connector);

  bool HasSubscribersFast(const std::string& topic);
};
HasSubscribersFast() lets Publisher<T>::Publish() skip serialization when no network subscribers exist and there are also no inproc subscribers — for example, during early startup before any other unit has subscribed.

Alternate inproc types

A publisher can carry an alternate inproc type alongside the primary message type. This allows a publisher to deliver a compact sensor-native format directly to co-located subscribers while converting to a standard format for network consumers. The T_CONVERTABLE_INPROC template parameter on Publisher enables this.

TCP transport

The TCP transport (TcpTransport) handles cross-process delivery. It serializes messages into MessagePacket byte buffers and sends them over TCP connections.

Publisher side: TcpPublisher

// From tcp.h
class TcpPublisher : public core::transport::TransportPublisher {
public:
  // Opens a listen socket on a random port
  static nonstd::expected<std::shared_ptr<TcpPublisher>, ...> Create(uint16_t port = 0);

  // Accepts new subscriber connections (called by TcpTransport::Update())
  size_t CheckForNewSubscriptions();

  // Sends a serialized packet to all connected senders
  virtual void SendMessage(std::shared_ptr<core::transport::MessagePacket> message) override;

  virtual void SetMaxQueueSize(size_t max_queue_size) override;

  virtual size_t GetSubscriberCount() override;
};
TcpPublisher holds a TcpListenSocket. On each Update() call, CheckForNewSubscriptions() accepts pending connections and creates a TcpSender for each one. Each TcpSender runs a dedicated background thread that drains an outbound queue. When the queue is full (controlled by SetMaxQueueSize()), older messages are dropped. A queue size of 0 means unlimited.

Subscriber side: TcpSubscriber

// From tcp_subscriber.h
class TcpSubscriber : public core::transport::TransportSubscriber {
public:
  // Connects to a publisher's host and port
  virtual bool Connect(std::string_view host,
                       std::string_view endpoint,
                       __uint128_t publisher_id) override;

  virtual size_t GetPublisherCount() override;
};
TcpSubscriber holds a map of TcpReceiver instances keyed by (address, port). When the coordinator delivers a NetworkInfo update, SubscriberBase::HandlePublisherInfo() calls Connect() on the matching TcpSubscriber, which establishes a TCP connection to the publisher. Inbound data is received via a shared Epoll instance and dispatched to a ThreadPool for deserialization. The deserialized MessagePacket is handed to the type-erased subscriber callback, which then invokes the user’s typed callback.

Transport plugin registration

TcpTransport implements the Transport interface:
class Transport {
public:
  virtual std::shared_ptr<TransportPublisher>
  Advertise(std::string_view topic,
            core::serialization::MessageTypeInfo type_info) = 0;

  virtual std::shared_ptr<TransportSubscriber>
  Subscribe(std::string_view topic,
            TypeErasedSubscriberCallback callback,
            basis::core::threading::ThreadPool* work_thread_pool,
            core::serialization::MessageTypeInfo type_info) = 0;

  virtual void Update() = 0;
};
You register a transport with the manager using:
transport_manager->RegisterTransport("tcp",
    std::make_unique<basis::plugins::transport::TcpTransport>());
CreateStandardTransportManager() (called by Unit::CreateTransportManager()) does this automatically and also enables the inproc transport.

On-demand serialization

Serialization only happens when there is at least one network subscriber. This is enforced inside Publisher<T_MSG>::Publish():
void Publish(std::shared_ptr<const T_MSG> msg) {
  // Always deliver inproc — no serialization
  if (inproc) {
    inproc->Publish(msg);
  }

  // Early exit if no network subscribers
  if (!GetTransportSubscriberCount()) {
    return;
  }

  // Serialize only now
  const size_t payload_size = get_message_size_cb(*msg);
  auto packet = std::make_shared<MessagePacket>(
      MessageHeader::DataType::MESSAGE, payload_size);
  write_message_to_span_cb(*msg, packet->GetMutablePayload());
  PublishRaw(std::move(packet), now);
}
This means that in a single-process system, no serialization overhead is incurred at all. When a second process subscribes over the network, serialization begins automatically — no code change required. The same principle applies to recording. If a RecorderInterface is attached to the transport manager, messages are written to disk only if the recorder accepted the topic during Advertise(). Raw struct types (using RawSerializer) are never recorded.

Controlling transports per input

In the unit YAML, each input can restrict which transports it accepts:
inputs:
  /topic_a:
    type: protobuf:A
    # Only receive via inproc — co-located publisher required
    allow_transports:
      - inproc

  /topic_b:
    type: protobuf:B
    # Refuse inproc delivery — forces network serialization even in-process
    deny_transports:
      - inproc
allow_transports is a whitelist. deny_transports is a blacklist. If neither is specified, the input accepts delivery from any available transport.
Use allow_transports: [inproc] for high-frequency, large-payload topics (for example, camera frames) that you want to keep zero-copy within a process. Use deny_transports: [inproc] on test inputs where you want to exercise the full serialization path.

QoS: queue depth

The unit YAML does not yet have a top-level QoS block, but queue depth is configurable in two places:
  • Publisher side: call Publisher::SetMaxQueueSize(n) after Advertise(). When the outbound TCP send queue exceeds n packets, older packets are dropped.
  • Subscriber side: pass queue_depth to SingleThreadedUnit::Subscribe(). This controls the depth of the per-subscriber SubscriberQueue that buffers callbacks waiting to be executed on the main thread.
// Subscriber with a callback queue of depth 5
time_test_sub = Subscribe<TimeTest>("/time_test", callback, /*queue_depth=*/5);
A queue_depth of 0 means the queue is unbounded.

Schema management

TransportManager includes a SchemaManager that tracks the message schemas for all advertised topics. When a new message type is first advertised, SchemaManager::RegisterType<T_MSG, T_Serializer>() serializes the schema (for example, the protobuf FileDescriptorProto) and queues it to be sent to the coordinator. The coordinator then makes schemas available to tools and recorders.
class SchemaManager {
public:
  template <typename T_MSG, typename T_Serializer>
  serialization::MessageSchema* RegisterType(
      const serialization::MessageTypeInfo& type_info);

  const serialization::MessageSchema* TryGetSchema(std::string schema) const;
};

Transport summary

TransportDeliverySerializationCross-processUse case
Inprocstd::shared_ptrNoneNoSame-process, zero-copy
TCPByte streamOn demandYesCross-process, cross-host

Next steps

Pub-sub messaging

How publishers and subscribers use transport under the hood

Units

How a Unit creates and owns its TransportManager

Synchronizers

How the transport layer feeds messages into synchronizers

Recording

How messages are captured to MCAP files via the recorder interface

Build docs developers (and LLMs) love