Skip to main content

Overview

The PostgreSQL persistence adapter provides distributed locks, circuit breaker state management, caching, and checkpoint support using PostgreSQL as the backend. It uses application-level locks with TTL tracking and automatic table creation.

Installation

npm install @go-go-scope/persistence-postgres pg

Features

  • Distributed Locks: Application-level locks with TTL and automatic expiration
  • Circuit Breaker State: Persistent failure tracking with JSONB storage
  • Caching: Database-backed cache with TTL support
  • Checkpoints: Full checkpoint support for task recovery
  • Idempotency: Prevent duplicate operations (via PostgresIdempotencyAdapter)
  • Auto Schema: Automatic table creation on connect
  • Transactional Safety: ACID guarantees for lock operations

Basic Usage

import { Pool } from 'pg'
import { PostgresAdapter } from '@go-go-scope/persistence-postgres'
import { scope } from 'go-go-scope'

const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
})

const persistence = new PostgresAdapter(pool, { keyPrefix: 'myapp:' })

// Initialize tables
await persistence.connect()

await using s = scope({ persistence })

Configuration

pool
Pool
required
pg Pool instance for PostgreSQL connections
options
PersistenceAdapterOptions
Configuration options

Database Schema

The adapter automatically creates the following tables on connect():

go_goscope_locks

CREATE TABLE IF NOT EXISTS go_goscope_locks (
  key TEXT PRIMARY KEY,
  owner TEXT NOT NULL,
  expires_at TIMESTAMP NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_locks_expires ON go_goscope_locks(expires_at);

go_goscope_circuit

CREATE TABLE IF NOT EXISTS go_goscope_circuit (
  key TEXT PRIMARY KEY,
  state TEXT NOT NULL,
  failure_count INTEGER NOT NULL DEFAULT 0,
  last_failure_time TIMESTAMP,
  last_success_time TIMESTAMP,
  updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_circuit_updated ON go_goscope_circuit(updated_at);

go_goscope_cache

CREATE TABLE IF NOT EXISTS go_goscope_cache (
  key TEXT PRIMARY KEY,
  value JSONB NOT NULL,
  expires_at TIMESTAMP,
  accessed_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_cache_expires ON go_goscope_cache(expires_at);

go_goscope_checkpoints

CREATE TABLE IF NOT EXISTS go_goscope_checkpoints (
  id TEXT PRIMARY KEY,
  task_id TEXT NOT NULL,
  sequence INTEGER NOT NULL,
  timestamp BIGINT NOT NULL,
  progress INTEGER NOT NULL DEFAULT 0,
  data JSONB NOT NULL,
  estimated_time_remaining BIGINT
);

Lock Provider Methods

acquire(key, ttl, owner?)

Acquires a distributed lock with automatic TTL-based expiration. Uses transactions with FOR UPDATE for strong consistency.
key
string
required
Lock identifier
ttl
number
required
Time-to-live in milliseconds
owner
string
Optional lock owner identifier (auto-generated if not provided)
Returns: Promise<LockHandle | null> - Lock handle or null if already locked

extend(key, ttl, owner)

Extends the TTL of an existing lock.
key
string
required
Lock identifier
ttl
number
required
New time-to-live in milliseconds
owner
string
required
Lock owner identifier
Returns: Promise<boolean> - True if extended successfully

forceRelease(key)

Forces release of a lock regardless of owner.
key
string
required
Lock identifier

Cache Provider Methods

get(key)

Retrieves a cached value using JSONB storage.
key
string
required
Cache key
Returns: Promise<T | null> - Cached value or null if not found/expired

set(key, value, ttl?)

Stores a value in the cache using JSONB.
key
string
required
Cache key
value
T
required
Value to cache (must be JSON-serializable)
ttl
number
Time-to-live in milliseconds (optional)

delete(key)

Removes a cached value.
key
string
required
Cache key

clear()

Clears all cached values (respects key prefix if set).

keys(pattern?)

Lists all cache keys matching an optional pattern.
pattern
string
Optional SQL LIKE pattern (e.g., 'user:%')
Returns: Promise<string[]> - Array of matching keys

Checkpoint Provider Methods

save(checkpoint)

Saves a checkpoint for task recovery.
checkpoint
Checkpoint<T>
required
Checkpoint data including taskId, sequence, progress, and data

load(checkpointId)

Loads a specific checkpoint by ID.
checkpointId
string
required
Checkpoint identifier
Returns: Promise<Checkpoint<T> | undefined>

loadLatest(taskId)

Loads the most recent checkpoint for a task.
taskId
string
required
Task identifier
Returns: Promise<Checkpoint<T> | undefined>

list(taskId)

Lists all checkpoints for a task.
taskId
string
required
Task identifier
Returns: Promise<Checkpoint<unknown>[]>

cleanup(taskId, keepCount)

Deletes old checkpoints, keeping only the most recent N.
taskId
string
required
Task identifier
keepCount
number
required
Number of checkpoints to keep

deleteAll(taskId)

Deletes all checkpoints for a task.
taskId
string
required
Task identifier

Connection Example

import { Pool } from 'pg'
import { PostgresAdapter } from '@go-go-scope/persistence-postgres'

const pool = new Pool({
  host: process.env.POSTGRES_HOST || 'localhost',
  port: parseInt(process.env.POSTGRES_PORT || '5432'),
  database: process.env.POSTGRES_DB || 'myapp',
  user: process.env.POSTGRES_USER,
  password: process.env.POSTGRES_PASSWORD,
  max: 20, // Maximum pool size
})

const persistence = new PostgresAdapter(pool, {
  keyPrefix: 'myapp:',
})

// Initialize tables
await persistence.connect()

// Use with scope
await using s = scope({ persistence })

// Cleanup
await persistence.disconnect()

Best Practices

Call connect() once at application startup to create tables. The adapter uses IF NOT EXISTS so it’s safe to call multiple times.
The adapter uses client-side time for lock expiration to avoid clock skew issues. Ensure servers are synchronized via NTP.
Lock acquisition uses FOR UPDATE which provides strong consistency but may serialize concurrent lock attempts. This is ideal for critical sections.

Performance Considerations

  • Lock operations use transactions with row-level locking
  • Expired locks are cleaned up on each acquire() call
  • JSONB indexes recommended for cache queries on large datasets
  • Connection pooling is handled by the pg Pool

Redis Adapter

Redis-based persistence

MySQL Adapter

MySQL-based persistence

Build docs developers (and LLMs) love