Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/NikolayS/PgQue/llms.txt

Use this file to discover all available pages before exploring further.

pgque-py is a thin wrapper over the pgque.* SQL functions for Python applications. It uses psycopg 3 and supports both a low-level client API and a high-level Consumer class with polling and optional LISTEN/NOTIFY wakeup.

Installation

pip install --pre pgque-py
# or pin to the exact version:
pip install "pgque-py==0.2.0rc1"
Requires Python 3.10+ and PostgreSQL 14+ with the PgQue schema installed.

Database permissions

Grant both roles to apps that produce and consume:
grant pgque_reader to your_app_user;
grant pgque_writer to your_app_user;

Low-level client

import pgque

with pgque.connect("postgresql://localhost/mydb") as client:
    # One-time setup (run in a migration)
    client.conn.execute("select pgque.subscribe('orders', 'order_worker')")
    client.conn.commit()

    # Producer: commit once to publish atomically
    event_id = client.send("orders", {"order_id": 42}, type="order.created")
    batch_ids = client.send_batch("orders", "order.created", [
        {"order_id": 43},
        {"order_id": 44},
    ])
    client.conn.commit()
pgque.connect() is non-autocommit by default. You must call client.conn.commit() after producing events. The snapshot rule requires send and receive to run in separate committed transactions — do not wrap both in one begin/commit.

Client methods

MethodDescription
connect(dsn)Returns a PgqueClient. Non-autocommit by default.
client.send(queue, payload, type=)Publish one event. Returns event id (int).
client.send_batch(queue, type, payloads)Publish a batch. Returns list of event ids.
client.receive(queue, consumer, max_messages=)Fetch messages from the next batch.
client.ack(batch_id)Close the batch and advance the consumer cursor. Returns rowcount (int): 1 success, 0 stale.
client.nack(batch_id, msg, retry_after=, reason=)Re-queue with delay, or route to DLQ.
client.force_next_tick(queue)Force the next ticker() to materialize a tick. For testing.

High-level Consumer

import pgque

consumer = pgque.Consumer(
    dsn="postgresql://localhost/mydb",
    queue="orders",
    name="order_worker",
)

@consumer.on("order.created")
def handle_order(msg: pgque.Message) -> None:
    print(f"got {msg.type}: {msg.payload}")

# Optional catch-all for unregistered types
@consumer.on("*")
def handle_unknown(msg: pgque.Message) -> None:
    print(f"unhandled type {msg.type!r}: {msg.payload}")

consumer.start()  # blocks until SIGTERM / SIGINT
The Consumer class handles transaction boundaries internally (autocommit + explicit conn.transaction() around receive/dispatch/ack). It also supports optional LISTEN/NOTIFY wakeup for lower-latency delivery.

Consumer options

OptionDefaultDescription
dsnrequiredDatabase connection string
queuerequiredQueue name
namerequiredConsumer name
max_messagesint maxPer-receive row limit. Lower only if >= queue’s worst-case batch size.
unknown_handler_policy"nack"What to do with unregistered event types: "nack" (retry/DLQ) or "ack" (log and skip)
subconsumerNoneEnable cooperative consumer mode (experimental)
dead_intervalNoneStale-batch takeover window in cooperative mode

Cooperative consumers (experimental)

Cooperative consumers are experimental. APIs may change before promotion to stable.
# worker-1 (run each worker in a separate process)
consumer = pgque.Consumer(
    dsn="postgresql://localhost/mydb",
    queue="orders",
    name="order_worker",
    subconsumer="worker-1",
    dead_interval="5 minutes",
)

@consumer.on("order.created")
def handle(msg):
    process(msg)

consumer.start()
Low-level cooperative methods:
client.subscribe_subconsumer("orders", "order_worker", "worker-1")
msgs = client.receive_coop(
    "orders", "order_worker", "worker-1",
    max_messages=100, dead_interval="5 minutes",
)
client.ack(msgs[0].batch_id)
client.touch_subconsumer("orders", "order_worker", "worker-1")
client.unsubscribe_subconsumer(
    "orders", "order_worker", "worker-1", batch_handling=1,
)

Build docs developers (and LLMs) love