Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/NikolayS/PgQue/llms.txt

Use this file to discover all available pages before exploring further.

PgQue’s fan-out model uses a shared event log with independent per-consumer cursors. Unlike competing-consumer designs (SKIP LOCKED), every subscribed consumer sees every event through its own cursor. This is closer to Kafka topics than to a traditional job queue.

Basic fan-out

Subscribe all consumers before producing. A new consumer starts from the most recent tick and will not see events sent before its subscribe call.
-- Subscribe three consumers to the same queue
select pgque.subscribe('orders', 'audit_logger');
select pgque.subscribe('orders', 'notification_sender');
select pgque.subscribe('orders', 'analytics_pipeline');

-- Send an event (separate transactions in psql autocommit)
select pgque.send('orders', 'order.created', '{"order_id": 1}'::jsonb);
select pgque.force_next_tick('orders');  -- separate transaction
select pgque.ticker();                   -- separate transaction

-- Each consumer independently receives the same event
select * from pgque.receive('orders', 'audit_logger', 100);
select * from pgque.receive('orders', 'notification_sender', 100);
select * from pgque.receive('orders', 'analytics_pipeline', 100);
Each consumer’s cursor advances independently. audit_logger acking its batch does not affect notification_sender’s position.

Exactly-once processing

Wrap receive, your side-effect writes, and ack in one transaction for exactly-once semantics on the same database:
begin;
  with msgs as (
    select * from pgque.receive('orders', 'processor', 100)
  ),
  inserted as (
    insert into processed_orders (order_id, status)
    select (payload::jsonb->>'order_id')::int, 'done'
    from msgs
  )
  select pgque.ack((select batch_id from msgs limit 1));
commit;
The inserted CTE executes even though the outer query doesn’t reference it directly (data-modifying CTEs always run). Every row in msgs carries the same batch_id, so the scalar subquery picks any one of them.
pgque.ack(batch_id) advances the consumer past the entire underlying batch, even if receive() returned fewer rows due to max_return. Either consume the full batch or use max_return >= ticker_max_count (default 500) to avoid silently dropping events.

Recurring jobs with pg_cron

Use pg_cron to enqueue scheduled events:
select cron.schedule('daily_report',
  '0 9 * * *',
  $$select pgque.send('jobs', 'report.generate',
      '{"type": "daily"}'::jsonb)$$);

Cooperative consumers (experimental)

Cooperative consumers are experimental. Function signatures and behavior may change before promotion to stable.
Cooperative consumers let multiple worker processes share one logical consumer cursor. This is useful for parallelizing processing of one consumer’s events across many workers without creating separate subscriptions.
-- Workers call receive_coop() — auto-registers consumer and subconsumer on first call
select * from pgque.receive_coop('orders', 'processor', 'worker-1', 100);
select * from pgque.receive_coop('orders', 'processor', 'worker-2', 100);

-- Each subconsumer gets its own active batch
-- ack() is called per subconsumer batch_id as usual
select pgque.ack(:batch_id);
Cooperative allocation serializes on a FOR UPDATE of the coop_main subscription row. Scale ticker_max_count and tick cadence together with worker count to keep batches large enough to amortize the lock overhead.

Stale batch takeover

Pass dead_interval to take over batches from inactive workers:
select * from pgque.receive_coop(
    'orders', 'processor', 'worker-1', 100,
    interval '5 minutes'   -- dead_interval: take over batches idle > 5 min
);
Takeover allocates a fresh batch_id; old tokens cannot ack/nack the new owner’s state.

Explicit registration

For converting an existing normal consumer, register explicitly:
select pgque.register_subconsumer('orders', 'processor', 'worker-1');
select pgque.register_subconsumer('orders', 'processor', 'worker-2');

-- To convert an existing normal consumer (must not have active batches)
select pgque.register_subconsumer('orders', 'processor', 'worker-1', convert_normal => true);

Build docs developers (and LLMs) love