Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/NikolayS/PgQue/llms.txt

Use this file to discover all available pages before exploring further.

The pgque npm package is a thin, idiomatic TypeScript wrapper over the pgque.* SQL functions, built on node-postgres (pg). Each send, receive, and ack call runs in its own implicit transaction via pg.Pool, satisfying the snapshot rule automatically.

Installation

npm install pgque@rc
Requires Node.js 20+ and PostgreSQL 14+ with the PgQue schema installed.

Database permissions

grant pgque_reader to your_app_user;
grant pgque_writer to your_app_user;

Quickstart

import { connect } from 'pgque';

const client = await connect(process.env.DATABASE_URL!);
try {
  // One-time setup (run in a migration)
  await client.subscribe('orders', 'order_worker');

  // Produce
  const eventId = await client.send('orders', {
    type: 'order.created',
    payload: { id: 42 },
  });
  const batchIds = await client.sendBatch('orders', 'order.created', [
    { id: 43 },
    { id: 44 },
  ]);
  console.log('published', eventId, batchIds);

  // High-level consumer
  const consumer = client.newConsumer('orders', 'order_worker');
  consumer.handle('order.created', async (msg) => {
    const data = JSON.parse(msg.payload) as { id: number };
    console.log('got', msg.type, data);
  });

  const ac = new AbortController();
  process.on('SIGINT', () => ac.abort());
  await consumer.start(ac.signal);
} finally {
  await client.close();
}

Client API

MethodReturnsDescription
connect(dsn, poolOptions?)Promise<PgqueClient>Connect via pg.Pool.
client.send(queue, event)Promise<bigint>Publish one event.
client.sendBatch(queue, type, payloads)Promise<bigint[]>Publish a batch atomically.
client.receive(queue, consumer, max?)Promise<Message[]>Fetch up to max messages (default 100).
client.ack(batchId)Promise<number>Finish the batch. Returns 1 success, 0 stale.
client.nack(batchId, msg, opts?)Promise<void>Retry with delay or route to DLQ.
client.subscribe(queue, consumer)Promise<number>Register consumer.
client.unsubscribe(queue, consumer)Promise<number>Remove consumer.
client.ticker(queue)Promise<bigint | null>Tick one queue.
client.tickerAll()Promise<number>Tick all queues.
client.forceNextTick(queue)Promise<bigint | null>For testing.
client.newConsumer(queue, name, opts?)ConsumerCreate a high-level poll loop.
client.close()Promise<void>Drain the pool.

bigint IDs

Message.msgId, Message.batchId, and the return values of send(), sendBatch(), ticker(), and forceNextTick() are JavaScript bigint to match PostgreSQL bigint without precision loss. The int8 → bigint parser is registered only on pgque’s internal pool — it does not affect other pg.Pool or pg.Client instances in the same process.

Typed errors

import { PgqueQueueNotFoundError, PgqueConsumerNotFoundError } from 'pgque';

try {
  await client.send('unknown-queue', { type: 'x', payload: {} });
} catch (err) {
  if (err instanceof PgqueQueueNotFoundError) {
    // create the queue, then retry
  } else if (err instanceof PgqueConsumerNotFoundError) {
    // re-subscribe the consumer
  } else {
    throw err;
  }
}
All errors derive from PgqueError. Types: PgqueConnectionError, PgqueQueueNotFoundError, PgqueConsumerNotFoundError, PgqueSqlError (with cause).

Cooperative consumers (experimental)

Cooperative consumers are experimental. APIs may change before promotion to stable.
await client.subscribeSubconsumer('orders', 'billing', 'worker-1');

const consumer = client.newConsumer('orders', 'billing', {
  subconsumer: 'worker-1',
  deadInterval: '5 minutes',
});
consumer.handle('order.created', async (msg) => { /* ... */ });
await consumer.start(ac.signal);
Low-level cooperative methods: subscribeSubconsumer, unsubscribeSubconsumer, receiveCoop, touchSubconsumer.

Transactions

pg.Pool.query satisfies the snapshot rule transparently — each send/receive/ack runs in its own implicit transaction. The footgun is client.rawPool: do not wrap pgque.send and pgque.receive in one shared pool client transaction. See how PgQue works for the snapshot rule.

Build docs developers (and LLMs) love