Distributed Systems Patterns
go-go-scope provides distributed primitives for building reliable multi-node systems with Redis, PostgreSQL, and other persistence backends.Distributed Locks
Coordinate access to shared resources across multiple processes or servers.Create a distributed lock
import { scope, Lock } from 'go-go-scope'
import { RedisAdapter } from '@go-go-scope/persistence-redis'
import { Redis } from 'ioredis'
const redis = new Redis(process.env.REDIS_URL)
const adapter = new RedisAdapter(redis, { keyPrefix: 'myapp:' })
await using s = scope()
// Create distributed lock with 30 second TTL
const lock = new Lock(s.signal, {
provider: adapter,
key: 'resource:user:123',
ttl: 30000
})
// Acquire lock (blocks until available)
await using guard = await lock.acquire({ timeout: 5000 })
// Critical section - only one process can execute this
console.log('Lock acquired, performing operation...')
await performCriticalOperation()
// Lock automatically released when guard is disposed
Use in production scenarios
// Example: Ensure only one instance processes a payment
async function processPayment(orderId: string) {
await using s = scope()
const lock = new Lock(s.signal, {
provider: adapter,
key: `payment:${orderId}`,
ttl: 60000 // 1 minute
})
// Try to acquire lock with timeout
const [err] = await s.task(
async () => {
await using guard = await lock.acquire({ timeout: 2000 })
// Check if already processed
const payment = await db.query(
'SELECT * FROM payments WHERE order_id = ?',
[orderId]
)
if (payment.status === 'completed') {
console.log('Payment already processed')
return
}
// Process payment
await chargeCard(payment.amount, payment.cardToken)
await db.query(
'UPDATE payments SET status = ? WHERE order_id = ?',
['completed', orderId]
)
}
)
if (err) {
console.error('Failed to acquire lock:', err.message)
throw new Error('Payment is being processed by another instance')
}
}
Read-Write Locks
Allow multiple readers or a single writer for efficient shared resource access.import { scope, Lock } from 'go-go-scope'
import { RedisAdapter } from '@go-go-scope/persistence-redis'
const redis = new Redis()
const adapter = new RedisAdapter(redis)
await using s = scope()
// Create read-write lock
const rwlock = new Lock(s.signal, {
provider: adapter,
key: 'config:app-settings',
ttl: 30000,
allowMultipleReaders: true // Enable read-write mode
})
// Multiple readers can acquire simultaneously
await using readGuard1 = await rwlock.read()
await using readGuard2 = await rwlock.read()
await using readGuard3 = await rwlock.read()
const config = await loadConfig()
console.log('Config:', config)
// Write lock is exclusive
await using writeGuard = await rwlock.write({ timeout: 5000 })
await updateConfig({ theme: 'dark' })
Distributed Circuit Breaker
Share circuit breaker state across multiple instances.import { scope, CircuitBreaker } from 'go-go-scope'
import { RedisAdapter } from '@go-go-scope/persistence-redis'
const redis = new Redis()
const adapter = new RedisAdapter(redis)
// Circuit breaker with distributed state
const breaker = new CircuitBreaker({
failureThreshold: 5,
resetTimeout: 60000,
halfOpenRequests: 3,
stateProvider: adapter, // Share state across instances
key: 'breaker:external-api'
})
await using s = scope()
// All instances share the same circuit breaker state
const [err, result] = await s.task(
() => fetch('https://external-api.com/data').then(r => r.json()),
{ circuitBreaker: breaker, timeout: 5000 }
)
if (err) {
console.error('Circuit breaker is open or request failed')
}
Multi-Tier Caching
Implement distributed caching with automatic warming and TTL management.import { CacheWarmer } from 'go-go-scope'
import { RedisAdapter } from '@go-go-scope/persistence-redis'
const redis = new Redis()
const cacheProvider = new RedisAdapter(redis)
await using s = scope()
const cacheWarmer = new CacheWarmer(s, cacheProvider, {
defaultTTL: 60000,
defaultRefreshThreshold: 0.2,
checkInterval: 10000
})
// Register warmed cache entry
const userCache = cacheWarmer.register('user:123', {
fetcher: async () => {
const [err, user] = await s.task(() =>
db.query('SELECT * FROM users WHERE id = ?', [123])
)
if (err) throw err
return user[0]
},
ttl: 60000,
refreshThreshold: 0.2, // Refresh at 80% TTL
backgroundRefresh: true,
onRefresh: (user) => console.log('User cache refreshed:', user.id),
onError: (err) => console.error('Cache refresh error:', err)
})
// Get from cache (auto-refreshes if stale)
const [err, user] = await userCache.get()
Persistence Backends
go-go-scope supports multiple persistence backends for distributed primitives.import { RedisAdapter } from '@go-go-scope/persistence-redis'
import { Redis } from 'ioredis'
const redis = new Redis({
host: 'localhost',
port: 6379,
password: process.env.REDIS_PASSWORD,
db: 0
})
const adapter = new RedisAdapter(redis, {
keyPrefix: 'myapp:' // Optional namespace
})
// Use for locks, circuit breakers, cache
await adapter.connect()
Idempotency
Ensure operations are executed only once, even with retries.import { IdempotencyProvider } from 'go-go-scope'
import { RedisIdempotencyAdapter } from '@go-go-scope/persistence-redis'
import { Redis } from 'ioredis'
const redis = new Redis()
const idempotency = new RedisIdempotencyAdapter(redis)
await using s = scope()
// Register idempotent operation
const [err, result] = await s.task(
async () => {
// This function will only execute once per requestId
await chargeCard(orderId, amount)
return { charged: true, amount }
},
{
idempotency: {
provider: idempotency,
key: `payment:${requestId}`,
ttl: 86400000 // 24 hours
}
}
)
if (err) {
console.error('Payment failed:', err)
} else {
console.log('Payment result:', result)
}
Checkpointing for Long-Running Tasks
Save progress and resume after failures.import { scope, Checkpoint } from 'go-go-scope'
import { RedisAdapter } from '@go-go-scope/persistence-redis'
const redis = new Redis()
const adapter = new RedisAdapter(redis)
await using s = scope()
async function processLargeDataset(datasetId: string) {
// Try to load existing checkpoint
const checkpoint = await adapter.loadLatestCheckpoint(datasetId)
let startIndex = checkpoint?.state?.lastProcessedIndex ?? 0
console.log(`Resuming from index ${startIndex}`)
const items = await loadDataset(datasetId)
for (let i = startIndex; i < items.length; i++) {
await processItem(items[i])
// Save checkpoint every 100 items
if (i % 100 === 0) {
await adapter.saveCheckpoint({
taskId: datasetId,
sequence: i,
timestamp: Date.now(),
state: { lastProcessedIndex: i }
})
console.log(`Checkpoint saved at index ${i}`)
}
}
console.log('Processing complete')
// Cleanup checkpoints
await adapter.deleteAllCheckpoints(datasetId)
}
Leader Election
Implement leader election for coordinated tasks.import { scope, Lock } from 'go-go-scope'
import { RedisAdapter } from '@go-go-scope/persistence-redis'
class LeaderElection {
private isLeader = false
constructor(
private adapter: RedisAdapter,
private instanceId: string
) {}
async runAsLeader(task: () => Promise<void>) {
await using s = scope()
const lock = new Lock(s.signal, {
provider: this.adapter,
key: 'leader:election',
ttl: 10000, // 10 second TTL
owner: this.instanceId
})
// Try to acquire leadership
const [err] = await s.task(
async () => {
await using guard = await lock.acquire({ timeout: 1000 })
this.isLeader = true
console.log(`${this.instanceId} became leader`)
// Execute leader task
await task()
this.isLeader = false
}
)
if (err) {
console.log(`${this.instanceId} is follower`)
}
}
}
// Use in multiple instances
const election = new LeaderElection(adapter, process.env.INSTANCE_ID)
setInterval(() => {
election.runAsLeader(async () => {
console.log('Running leader tasks...')
await performLeaderTasks()
})
}, 5000)
Distributed locks use TTL-based expiry to prevent deadlocks. Always set appropriate TTL values based on your operation duration.
Distributed Rate Limiting
Share rate limits across multiple instances.import { Semaphore } from 'go-go-scope'
import { RedisAdapter } from '@go-go-scope/persistence-redis'
const redis = new Redis()
const adapter = new RedisAdapter(redis)
// Distributed semaphore limiting API calls across all instances
const apiSemaphore = new Semaphore(100, {
provider: adapter,
key: 'ratelimit:api'
})
await using s = scope()
const [err, result] = await s.task(
async ({ signal }) => {
// Acquire permit (blocks if limit reached globally)
await using permit = await apiSemaphore.acquire(signal)
// Make API call
return await fetch('https://api.example.com/data')
},
{ timeout: 10000 }
)