Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/NikolayS/PgQue/llms.txt

Use this file to discover all available pages before exploring further.

pgque-go is a thin, idiomatic Go wrapper over the pgque.* SQL functions, built on pgx/v5. It satisfies the snapshot rule transparently via pgxpool — each Send, Receive, and Ack runs in its own implicit transaction.

Installation

go get github.com/NikolayS/pgque-go@v0.2.0-rc.1
Requires Go 1.21+ and PostgreSQL 14+ with the PgQue schema installed.

Database permissions

grant pgque_reader to your_app_user;
grant pgque_writer to your_app_user;

Quickstart

package main

import (
    "context"
    "log"

    pgque "github.com/NikolayS/pgque-go"
)

func main() {
    ctx := context.Background()

    client, err := pgque.Connect(ctx, "postgres://user:pass@localhost/mydb")
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Produce a single event
    _, err = client.Send(ctx, "orders", pgque.Event{
        Type:    "order.created",
        Payload: map[string]any{"order_id": 42},
    })
    if err != nil {
        log.Fatal(err)
    }

    // Produce a batch
    ids, err := client.SendBatch(ctx, "orders", "order.created", []any{
        map[string]any{"order_id": 43},
        map[string]any{"order_id": 44},
    })
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("published: %v", ids)

    // High-level consumer
    consumer := client.NewConsumer("orders", "order_worker",
        pgque.WithUnknownHandlerPolicy(pgque.NackUnknown),
    )
    consumer.Handle("order.created", func(ctx context.Context, msg pgque.Message) error {
        log.Printf("got %s: %s", msg.Type, msg.Payload)
        return nil
    })
    if err := consumer.Start(ctx); err != nil {
        log.Fatal(err)
    }
}

Client API

MethodDescription
Connect(ctx, dsn)Returns (*Client, error). Uses pgxpool.
client.Send(ctx, queue, Event{})Publish one event. Returns (int64, error).
client.SendBatch(ctx, queue, type, payloads)Publish a batch. Returns ([]int64, error).
client.Receive(ctx, queue, consumer, max)Fetch messages. Returns ([]Message, error).
client.Ack(ctx, batchID)Close the batch. Returns (int64, error): 1 success, 0 stale/not found.
client.Nack(ctx, batchID, msg, NackOptions{})Retry with delay or route to DLQ.
client.ForceNextTick(ctx, queue)For testing — force next tick to materialize.
client.Pool()Access the underlying *pgxpool.Pool.

Consumer options

OptionDefaultDescription
WithPollInterval(d)30sIdle backoff between polls when the queue is empty
WithMaxMessages(n)math.MaxInt32Per-Receive row cap
WithUnknownHandlerPolicy(p)NackUnknownAckUnknown to log and skip messages with no handler

Nack options

err := client.Nack(ctx, batchID, msg, pgque.NackOptions{
    RetryAfter: ptr(5 * time.Minute),  // nil = SQL default (60s)
    Reason:     ptr("payment-declined"),
})

Typed errors

_, err := client.Send(ctx, "orders", pgque.Event{Type: "x", Payload: nil})
switch {
case errors.Is(err, pgque.ErrQueueNotFound):
    // create the queue, retry
case errors.Is(err, pgque.ErrConsumerNotFound):
    // re-register the consumer
case errors.Is(err, pgque.ErrBatchNotFound):
    // batch already finished — usually safe to ignore
case errors.Is(err, pgque.ErrConnection):
    // pool closed, network drop
case err != nil:
    var sqlErr *pgque.SQLError
    if errors.As(err, &sqlErr) {
        log.Printf("pgque %s failed: %s [SQLSTATE %s]",
            sqlErr.Op, sqlErr.Err, sqlErr.SQLSTATE)
    }
}
context.Canceled and context.DeadlineExceeded are preserved through the error chain.

Cooperative consumers (experimental)

Cooperative consumers are experimental. APIs may change before promotion to stable.
// Run each worker in a separate goroutine or process
worker1 := client.NewConsumer("orders", "order_workers",
    pgque.WithSubconsumer("worker-1"),
    pgque.WithDeadInterval(2*time.Minute),
)
worker1.Handle("order.created", func(ctx context.Context, m pgque.Message) error {
    return processOrder(ctx, m)
})
go worker1.Start(ctx)

worker2 := client.NewConsumer("orders", "order_workers",
    pgque.WithSubconsumer("worker-2"))
worker2.Handle("order.created", processOrderHandler)
go worker2.Start(ctx)
Low-level cooperative methods: SubscribeSubconsumer, UnsubscribeSubconsumer, ReceiveCoop, TouchSubconsumer.

Transactions

pgxpool satisfies the snapshot rule transparently — each Send/Receive/Ack is its own implicit transaction. The footgun is Client.Pool(): wrapping pgque.send and pgque.receive in one shared pgx.Tx will cause the consumer to see zero events. See how PgQue works for the snapshot rule explanation.

Build docs developers (and LLMs) love