Documentation Index
Fetch the complete documentation index at: https://mintlify.com/NikolayS/PgQue/llms.txt
Use this file to discover all available pages before exploring further.
pgque-go is a thin, idiomatic Go wrapper over the pgque.* SQL functions, built on pgx/v5. It satisfies the snapshot rule transparently via pgxpool — each Send, Receive, and Ack runs in its own implicit transaction.
Installation
go get github.com/NikolayS/pgque-go@v0.2.0-rc.1
Requires Go 1.21+ and PostgreSQL 14+ with the PgQue schema installed.
Database permissions
grant pgque_reader to your_app_user;
grant pgque_writer to your_app_user;
Quickstart
package main
import (
"context"
"log"
pgque "github.com/NikolayS/pgque-go"
)
func main() {
ctx := context.Background()
client, err := pgque.Connect(ctx, "postgres://user:pass@localhost/mydb")
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Produce a single event
_, err = client.Send(ctx, "orders", pgque.Event{
Type: "order.created",
Payload: map[string]any{"order_id": 42},
})
if err != nil {
log.Fatal(err)
}
// Produce a batch
ids, err := client.SendBatch(ctx, "orders", "order.created", []any{
map[string]any{"order_id": 43},
map[string]any{"order_id": 44},
})
if err != nil {
log.Fatal(err)
}
log.Printf("published: %v", ids)
// High-level consumer
consumer := client.NewConsumer("orders", "order_worker",
pgque.WithUnknownHandlerPolicy(pgque.NackUnknown),
)
consumer.Handle("order.created", func(ctx context.Context, msg pgque.Message) error {
log.Printf("got %s: %s", msg.Type, msg.Payload)
return nil
})
if err := consumer.Start(ctx); err != nil {
log.Fatal(err)
}
}
Client API
| Method | Description |
|---|
Connect(ctx, dsn) | Returns (*Client, error). Uses pgxpool. |
client.Send(ctx, queue, Event{}) | Publish one event. Returns (int64, error). |
client.SendBatch(ctx, queue, type, payloads) | Publish a batch. Returns ([]int64, error). |
client.Receive(ctx, queue, consumer, max) | Fetch messages. Returns ([]Message, error). |
client.Ack(ctx, batchID) | Close the batch. Returns (int64, error): 1 success, 0 stale/not found. |
client.Nack(ctx, batchID, msg, NackOptions{}) | Retry with delay or route to DLQ. |
client.ForceNextTick(ctx, queue) | For testing — force next tick to materialize. |
client.Pool() | Access the underlying *pgxpool.Pool. |
Consumer options
| Option | Default | Description |
|---|
WithPollInterval(d) | 30s | Idle backoff between polls when the queue is empty |
WithMaxMessages(n) | math.MaxInt32 | Per-Receive row cap |
WithUnknownHandlerPolicy(p) | NackUnknown | AckUnknown to log and skip messages with no handler |
Nack options
err := client.Nack(ctx, batchID, msg, pgque.NackOptions{
RetryAfter: ptr(5 * time.Minute), // nil = SQL default (60s)
Reason: ptr("payment-declined"),
})
Typed errors
_, err := client.Send(ctx, "orders", pgque.Event{Type: "x", Payload: nil})
switch {
case errors.Is(err, pgque.ErrQueueNotFound):
// create the queue, retry
case errors.Is(err, pgque.ErrConsumerNotFound):
// re-register the consumer
case errors.Is(err, pgque.ErrBatchNotFound):
// batch already finished — usually safe to ignore
case errors.Is(err, pgque.ErrConnection):
// pool closed, network drop
case err != nil:
var sqlErr *pgque.SQLError
if errors.As(err, &sqlErr) {
log.Printf("pgque %s failed: %s [SQLSTATE %s]",
sqlErr.Op, sqlErr.Err, sqlErr.SQLSTATE)
}
}
context.Canceled and context.DeadlineExceeded are preserved through the error chain.
Cooperative consumers (experimental)
Cooperative consumers are experimental. APIs may change before promotion to stable.
// Run each worker in a separate goroutine or process
worker1 := client.NewConsumer("orders", "order_workers",
pgque.WithSubconsumer("worker-1"),
pgque.WithDeadInterval(2*time.Minute),
)
worker1.Handle("order.created", func(ctx context.Context, m pgque.Message) error {
return processOrder(ctx, m)
})
go worker1.Start(ctx)
worker2 := client.NewConsumer("orders", "order_workers",
pgque.WithSubconsumer("worker-2"))
worker2.Handle("order.created", processOrderHandler)
go worker2.Start(ctx)
Low-level cooperative methods: SubscribeSubconsumer, UnsubscribeSubconsumer, ReceiveCoop, TouchSubconsumer.
Transactions
pgxpool satisfies the snapshot rule transparently — each Send/Receive/Ack is its own implicit transaction. The footgun is Client.Pool(): wrapping pgque.send and pgque.receive in one shared pgx.Tx will cause the consumer to see zero events. See how PgQue works for the snapshot rule explanation.