Documentation Index
Fetch the complete documentation index at: https://mintlify.com/NikolayS/PgQue/llms.txt
Use this file to discover all available pages before exploring further.
pgque-py is a thin wrapper over the pgque.* SQL functions for Python applications. It uses psycopg 3 and supports both a low-level client API and a high-level Consumer class with polling and optional LISTEN/NOTIFY wakeup.
Installation
pip install --pre pgque-py
# or pin to the exact version:
pip install "pgque-py==0.2.0rc1"
Requires Python 3.10+ and PostgreSQL 14+ with the PgQue schema installed.
Database permissions
Grant both roles to apps that produce and consume:
grant pgque_reader to your_app_user;
grant pgque_writer to your_app_user;
Low-level client
import pgque
with pgque.connect("postgresql://localhost/mydb") as client:
# One-time setup (run in a migration)
client.conn.execute("select pgque.subscribe('orders', 'order_worker')")
client.conn.commit()
# Producer: commit once to publish atomically
event_id = client.send("orders", {"order_id": 42}, type="order.created")
batch_ids = client.send_batch("orders", "order.created", [
{"order_id": 43},
{"order_id": 44},
])
client.conn.commit()
pgque.connect() is non-autocommit by default. You must call client.conn.commit() after producing events. The snapshot rule requires send and receive to run in separate committed transactions — do not wrap both in one begin/commit.
Client methods
| Method | Description |
|---|
connect(dsn) | Returns a PgqueClient. Non-autocommit by default. |
client.send(queue, payload, type=) | Publish one event. Returns event id (int). |
client.send_batch(queue, type, payloads) | Publish a batch. Returns list of event ids. |
client.receive(queue, consumer, max_messages=) | Fetch messages from the next batch. |
client.ack(batch_id) | Close the batch and advance the consumer cursor. Returns rowcount (int): 1 success, 0 stale. |
client.nack(batch_id, msg, retry_after=, reason=) | Re-queue with delay, or route to DLQ. |
client.force_next_tick(queue) | Force the next ticker() to materialize a tick. For testing. |
High-level Consumer
import pgque
consumer = pgque.Consumer(
dsn="postgresql://localhost/mydb",
queue="orders",
name="order_worker",
)
@consumer.on("order.created")
def handle_order(msg: pgque.Message) -> None:
print(f"got {msg.type}: {msg.payload}")
# Optional catch-all for unregistered types
@consumer.on("*")
def handle_unknown(msg: pgque.Message) -> None:
print(f"unhandled type {msg.type!r}: {msg.payload}")
consumer.start() # blocks until SIGTERM / SIGINT
The Consumer class handles transaction boundaries internally (autocommit + explicit conn.transaction() around receive/dispatch/ack). It also supports optional LISTEN/NOTIFY wakeup for lower-latency delivery.
Consumer options
| Option | Default | Description |
|---|
dsn | required | Database connection string |
queue | required | Queue name |
name | required | Consumer name |
max_messages | int max | Per-receive row limit. Lower only if >= queue’s worst-case batch size. |
unknown_handler_policy | "nack" | What to do with unregistered event types: "nack" (retry/DLQ) or "ack" (log and skip) |
subconsumer | None | Enable cooperative consumer mode (experimental) |
dead_interval | None | Stale-batch takeover window in cooperative mode |
Cooperative consumers (experimental)
Cooperative consumers are experimental. APIs may change before promotion to stable.
# worker-1 (run each worker in a separate process)
consumer = pgque.Consumer(
dsn="postgresql://localhost/mydb",
queue="orders",
name="order_worker",
subconsumer="worker-1",
dead_interval="5 minutes",
)
@consumer.on("order.created")
def handle(msg):
process(msg)
consumer.start()
Low-level cooperative methods:
client.subscribe_subconsumer("orders", "order_worker", "worker-1")
msgs = client.receive_coop(
"orders", "order_worker", "worker-1",
max_messages=100, dead_interval="5 minutes",
)
client.ack(msgs[0].batch_id)
client.touch_subconsumer("orders", "order_worker", "worker-1")
client.unsubscribe_subconsumer(
"orders", "order_worker", "worker-1", batch_handling=1,
)