Skip to main content
A synchronizer sits between a set of subscribers and a handler function. Each time a message arrives on one of the handler’s input topics, it is handed to the synchronizer. The synchronizer decides whether the combination of buffered messages satisfies a condition. When it does, it consumes those messages and calls the handler. This model decouples receiving a message from acting on a message. You never write explicit buffering or synchronization code — the synchronizer does it for you.

How synchronizers work

Every synchronizer in Basis inherits from SynchronizerBase<T_MSG_CONTAINERs...>:
// From synchronizer_base.h
template <typename... T_MSG_CONTAINERs>
class SynchronizerBase : Synchronizer {
public:
  using MessageSumType = std::tuple<T_MSG_CONTAINERs...>;

  // Called by the subscriber callback for input INDEX
  template <size_t INDEX>
  bool OnMessage(auto msg, MessageSumType* out = nullptr);

  // Called by the rate subscriber thread to check and consume
  std::optional<MessageSumType> ConsumeIfReady();

  bool IsReady();

protected:
  virtual bool IsReadyNoLock() = 0;  // implemented by each sync type
};
Each input topic maps to one position in the MessageSumType tuple. When OnMessage<INDEX>() is called, the message is stored. IsReadyNoLock() is then checked. If ready, ConsumeMessagesNoLock() extracts all buffered messages and returns them as a MessageSumType tuple. Consumed messages are cleared from storage — unless the input is marked cached (see below).

Thread safety

All synchronizer operations are protected by an internal std::mutex. Messages can arrive concurrently from multiple subscriber callbacks; the synchronizer ensures correctness.

Input storage modifiers

Two per-input modifiers change how the synchronizer buffers messages.

optional

An optional input does not block the synchronizer from firing. The synchronizer checks readiness as if that input were always filled. If a message has arrived by the time the handler fires, it is included; otherwise the field is empty (nullptr for shared pointers).
// From synchronizer_base.h
struct MessageMetadata {
  bool is_optional = false;
  bool is_cached   = false;
};

bool Storage::operator bool() const {
  if (metadata.is_optional) {
    return true;  // always counts as filled
  }
  // ...
}
In the unit YAML:
inputs:
  /event_data:
    type: protobuf:Event
    optional: true

cached

A cached input is not cleared after the handler fires. The most recently received message is reused in every subsequent handler invocation until a new message arrives.
T_MSG_CONTAINER Storage::Consume() {
  if (metadata.is_cached) {
    return data;           // keep the value in storage
  } else {
    return std::move(data); // clear the value from storage
  }
}
In the unit YAML, cached: true is set per-input.

accumulate

When an input is declared with accumulate, its container type is a std::vector<std::shared_ptr<const T_MSG>> instead of a single std::shared_ptr<const T_MSG>. Messages accumulate in the vector between handler invocations.
inputs:
  /event_data:
    type: protobuf:Event
    optional: true
    accumulate: 10   # buffer up to 10 messages; true = unlimited
The vector is part of the MessageSumType tuple, so your handler receives all accumulated messages at once.

Synchronizer types

all — all inputs received

The All synchronizer fires when every non-optional input has at least one message buffered.
// From all.h
template <typename... T_MSG_CONTAINERs>
class All : public SynchronizerBase<T_MSG_CONTAINERs...> {
protected:
  virtual bool IsReadyNoLock() override {
    return Base::AreAllNonOptionalFieldsFilledNoLock();
  }
};
Use all when you want to process the latest message from each topic, regardless of timestamps. For a single-input handler this is equivalent to “fire on every message”.
handlers:
  HandleAll:
    sync:
      type: all
    inputs:
      /topic_c:
        type: protobuf:C
      /topic_d:
        type: protobuf:D

equal — exact field matching

The equal synchronizer (FieldSyncEqual) fires when a designated field value is identical across all synced inputs. Unsynced inputs (those with no sync_field) are matched after the synced inputs have found a match.
// From field.h
struct Equal {
  template <typename T1, typename T2>
  auto operator()(const T1& t1, const T2& t2) { return t1 == t2; }
};

template <typename... T_FIELD_SYNCs>
using FieldSyncEqual = FieldSync<Equal, T_FIELD_SYNCs...>;
The underlying FieldSync buffers messages for each synced input. When a message arrives on a synced input, it searches each other synced input’s buffer for a message whose field value satisfies the operator. If all required synced inputs have a match, the matching messages are consumed together.
handlers:
  StereoMatch:
    sync:
      type: equal
      buffer_size: 2
    inputs:
      /camera_left:
        type: rosmsg:sensor_msgs::Image
        sync_field: header.stamp()
      /camera_right:
        type: rosmsg:sensor_msgs::Image
        sync_field: header.stamp()
    outputs:
      /camera_stereo:
        type: rosmsg:example_msgs::StereoImage
The sync_field can be:
  • A member access expression on the raw message pointer: header.stamp() calls a method.
  • A raw field with the :: prefix: ::time accesses a direct struct field.

approximate — approximate equality within a threshold

The approximate synchronizer (FieldSyncApproximatelyEqual) fires when the synced fields of all inputs are within a configurable epsilon of each other.
// From field.h
template <auto EPSILON>
struct ApproximatelyEqual {
  template <typename T1, typename T2>
  auto operator()(const T1& t1, const T2& t2) {
    return std::abs(t1 - t2) <= EPSILON;
  }
};

template <auto EPSILON, typename... T_FIELD_SYNCs>
using FieldSyncApproximatelyEqual = FieldSync<ApproximatelyEqual<EPSILON>, T_FIELD_SYNCs...>;
In C++ (from basis_example.cpp):
// Sync lidar (1 Hz) and vector data (10 Hz) within 50ms
std::unique_ptr<basis::synchronizers::FieldSyncApproximatelyEqual<
    0.05,  // epsilon in seconds
    basis::synchronizers::Field<
        std::shared_ptr<const ExampleStampedVector>,
        &ExampleStampedVector::time>,   // pointer-to-member
    basis::synchronizers::Field<
        std::shared_ptr<const sensor_msgs::PointCloud2>,
        [](const sensor_msgs::PointCloud2* msg) {
          return msg->header.stamp.toSec(); // lambda accessor
        }>>>
    vector_lidar_sync;
In the unit YAML, specify the epsilon as a duration string:
handlers:
  OnLidarSync:
    sync:
      type:
        approximate: 5ms
      buffer_size: 5
    inputs:
      /lidar_data:
        type: rosmsg:sensor_msgs::PointCloud2
        sync_field: header.stamp.toSecs()
      /vector_data:
        type: protobuf:StampedVectorData
        sync_field: ::time
      /event_data:
        type: protobuf:Event
        optional: true
        accumulate: 10
    outputs:
      /lidar_sync_event:
        type: protobuf:LidarSyncEvent
The buffer_size field caps how many messages per input channel are held before older messages are dropped.

rate — time-based firing

The rate synchronizer fires at a fixed frequency regardless of message arrival. It is driven by a RateSubscriber timer thread rather than by incoming messages. When the timer fires, the synchronizer calls ConsumeIfReady(). If all non-optional inputs have at least one cached message, the handler fires with those messages. The inputs act as a cache — their last received values are forwarded to the handler each tick.
handlers:
  TenHertzWithTopic:
    sync:
      rate: 10hz
    inputs:
      /topic_a:
        type: protobuf:A
      /topic_b:
        type: protobuf:B
        optional: true
    outputs:
      /topic_c:
        type: protobuf:C
        optional: true
In this example the handler fires 10 times per second as long as /topic_a has been received at least once. /topic_b is passed along if available.
Rate is declared in the handler’s sync block, not in the input. The rate determines when the synchronizer is polled — the inputs still accumulate between polls.
Supported rate formats in YAML: 10hz, 100ms, 0.1s.

external — externally triggered

An external synchronizer has no automatic trigger. The handler is called only when something outside the Basis pub-sub system explicitly requests it. This is useful for Units that bridge external sensors or callbacks (for example, a camera frame callback from an SDK).
handlers:
  External:
    sync:
      external
    outputs:
      /some_image:
        type: rosmsg:sensor_msgs::Image
An external handler has no inputs declared in YAML. Your C++ code calls the generated RunHandlerAndPublish() directly when data is available.

buffer_size

The buffer_size field on a sync block caps the per-channel message buffer used by equal and approximate synchronizers. When the buffer is full, incoming messages overwrite the oldest entry.
sync:
  type: equal
  buffer_size: 2
For the all synchronizer, buffer_size is not applicable — all holds only the most recent message per channel.

Using synchronizers directly in C++

You can instantiate and use synchronizers directly without code generation. The basis_example.cpp example shows the approximate synchronizer used manually:
void ExampleUnit::Initialize(const basis::UnitInitializeOptions& options) {
  vector_lidar_sync =
      std::make_unique<decltype(vector_lidar_sync)::element_type>();

  auto CheckLidarSync = [this]() {
    auto maybe_msgs = vector_lidar_sync->ConsumeIfReady();
    if (maybe_msgs) {
      OnSyncedVectorLidar(std::get<0>(*maybe_msgs),
                          std::get<1>(*maybe_msgs));
    }
  };

  vector_sub = Subscribe<ExampleStampedVector>(
      "/stamped_vector",
      [this, &CheckLidarSync](auto m) {
        vector_lidar_sync->OnMessage<0>(m);
        CheckLidarSync();
      });

  pc2_sub = Subscribe<sensor_msgs::PointCloud2>(
      "/point_cloud",
      [this, &CheckLidarSync](auto m) {
        vector_lidar_sync->OnMessage<1>(m);
        CheckLidarSync();
      });
}
OnMessage<INDEX>() returns true if the synchronizer became ready. You can use that return value instead of (or in addition to) calling ConsumeIfReady().

Synchronizer type summary

YAML typeC++ classTrigger condition
allbasis::synchronizers::All<...>All non-optional inputs have a message
equalbasis::synchronizers::FieldSyncEqual<...>Designated fields are exactly equal
approximate: Nbasis::synchronizers::FieldSyncApproximatelyEqual<EPSILON, ...>Designated fields are within epsilon
rate: NhzAll<...> polled by RateSubscriberFixed-period timer fires and all required inputs are cached
external(none — handler called directly)Triggered by external code

Next steps

Pub-sub messaging

How publishers and subscribers deliver messages to the synchronizer

Units

How HandlerPubSub integrates the synchronizer with publishers and subscribers

Unit YAML schema

Full reference for all sync fields, input options, and output options

Transport layers

How messages reach the synchronizer from inproc and network transports

Build docs developers (and LLMs) love