Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/NikolayS/PgQue/llms.txt

Use this file to discover all available pages before exploring further.

The consuming API wraps PgQ’s lower-level primitives (next_batch, get_batch_events, finish_batch, event_retry) into a concise receive → process → ack (or nack) loop. All functions in this section require the pgque_reader role. Note that pgque_writer does not inherit pgque_reader — applications that both produce and consume must hold both roles explicitly.
Snapshot rule. pgque.sendpgque.tickerpgque.receive must each run in their own committed transactions. The ticker’s snapshot must be taken after send commits, and receive only sees what committed before it. The same rule applies to pgque.maint_retry_eventspgque.tickerpgque.receive. Pool-based drivers (Go pgxpool, TypeScript pg.Pool) satisfy this automatically via autocommit. Python’s non-autocommit default requires explicit commit boundaries; the high-level Python Consumer handles this internally.

pgque.message composite type

pgque.receive returns a set of pgque.message rows. pgque.nack consumes the same composite.
msg_id
bigint
required
Event id (ev_id). The stable identifier for this event across retries.
batch_id
bigint
required
Batch containing this message. Pass this to ack or nack.
type
text
Event type (ev_type). May be null; 'default' for events sent without an explicit type.
payload
text
Event data (ev_data). Cast to jsonb for JSON access: msg.payload::jsonb.
retry_count
int4
NULL on first delivery; incremented by one on each retry redelivery.
created_at
timestamptz
Original event insertion time (ev_time).
extra1
text
ev_extra1. May be null.
extra2
text
ev_extra2. May be null.
extra3
text
ev_extra3. May be null.
extra4
text
ev_extra4. May be null.

Functions

pgque.receive(queue text, consumer text, max_return int default 100) → setof pgque.message

Pulls the next batch for consumer on queue and streams up to max_return messages. max_return must be >= 1; passing 0 or a negative value raises an error. Returns an empty set if no batch is available. Grant: pgque_reader. Source: sql/pgque-api/receive.sql.
queue
text
required
Name of the queue to consume from.
consumer
text
required
Name of the registered consumer. The consumer must have been subscribed with pgque.subscribe or pgque.register_consumer.
max_return
int
default:"100"
Maximum number of message rows returned to the caller. Must be >= 1.
Batch-ownership caveat. max_return limits the number of rows returned to the caller, but ack(batch_id) advances the consumer cursor past the entire underlying batch. If max_return < ticker_max_count, calling ack() after a partial receive will silently drop the unreturned rows from the consumer’s perspective. Either consume the full batch before acking, or use max_return >= ticker_max_count for safe pagination.
select * from pgque.receive('orders', 'processor', 100);

pgque.ack(batch_id bigint) → integer

Closes the batch and advances the consumer position. Modern alias for pgque.finish_batch. Returns 1 on success, 0 if the batch was not found. Grant: pgque_reader. Source: sql/pgque-api/receive.sql.
batch_id
bigint
required
The batch_id returned by pgque.receive. Closes the batch and moves the consumer cursor past it.
select pgque.ack(1);

pgque.nack(batch_id bigint, msg pgque.message, retry_after interval default '60 seconds', reason text default null) → integer

Negative-acknowledges one message from the active batch. Schedules it for redelivery or routes it to the dead letter queue when retries are exhausted. Only msg.msg_id (together with batch_id) is used from the composite — type, payload, retry_count, created_at, and the extra* fields are ignored. nack re-queries the canonical event from the active batch and uses those server-side values for all decisions and writes. Routing logic:
  • If the canonical ev_retry is below the queue’s max_retries, re-queues the event after retry_after (via pgque.event_retry).
  • If ev_retry >= max_retries, routes the event to pgque.dead_letter (via pgque.event_dead). This routing is idempotent: repeated calls for the same terminal message produce exactly one DLQ row.
  • If msg.msg_id is not present in the active batch — including a NULL msg_id or a msg_id from a different batch — raises msg_id % not found in batch %.
Grant: pgque_reader. Source: sql/pgque-api/receive.sql.
batch_id
bigint
required
The batch id from the active receive.
msg
pgque.message
required
The full message row returned by pgque.receive. Only msg.msg_id is read; other fields are ignored.
retry_after
interval
default:"'60 seconds'"
How long to wait before redelivering the message. Ignored when the message is being routed to the DLQ.
reason
text
Optional failure reason stored in pgque.dead_letter.dl_reason when the message is routed to the DLQ.
nack schedules a per-event retry (or DLQ routing), but it does not advance the consumer cursor. Always call ack(batch_id) after nacking all failed messages in a batch to finalize it and allow the consumer to proceed.
do $$
declare
    v_msg pgque.message;
begin
    select * into v_msg
    from pgque.receive('orders', 'processor', 1) limit 1;

    perform pgque.nack(v_msg.batch_id, v_msg, interval '5 minutes', 'validation failed');
    perform pgque.ack(v_msg.batch_id);
end $$;

pgque.subscribe(queue text, consumer text) → integer

Registers consumer on queue. Modern alias for pgque.register_consumer. Returns 1 on new registration, 0 if already registered. Grant: pgque_reader. Source: sql/pgque-api/send.sql.
queue
text
required
Name of the queue to subscribe to.
consumer
text
required
Name to give this consumer. Free-form text; used as the cursor identity.
select pgque.subscribe('orders', 'processor');

pgque.unsubscribe(queue text, consumer text) → integer

Removes the consumer and its retry-queue entries from queue. Modern alias for pgque.unregister_consumer. Returns the number of subscriptions removed. Grant: pgque_reader. Source: sql/pgque-api/send.sql.
queue
text
required
Name of the queue.
consumer
text
required
Name of the consumer to remove.
select pgque.unsubscribe('orders', 'processor');

Build docs developers (and LLMs) love