Background Job Processing
The @go-go-scope/scheduler package provides a distributed job scheduler with cron support, worker threads for CPU-intensive tasks, and multiple persistence backends.
Quick Start
Install the scheduler
npm install @go-go-scope/scheduler
npm install @go-go-scope/persistence-redis # or postgres, mysql, etc.
Create a scheduler instance
import { scope } from 'go-go-scope'
import { Scheduler , SQLJobStorage } from '@go-go-scope/scheduler'
import { PostgresAdapter } from '@go-go-scope/persistence-postgres'
await using s = scope ()
const storage = new SQLJobStorage (
'postgresql://localhost:5432/jobs' ,
new PostgresAdapter ()
)
const scheduler = new Scheduler ({
scope: s ,
storage ,
enableWebUI: true // Optional management UI
})
Define schedules
// Daily report at 9 AM
await scheduler . createSchedule ( 'daily-report' , {
cron: '0 9 * * *' ,
timezone: 'America/New_York' ,
concurrent: false // One at a time
})
// Process webhooks every 5 seconds
await scheduler . createSchedule ( 'process-webhooks' , {
interval: 5000 ,
concurrent: true // Multiple in parallel
})
Register handlers
scheduler . onSchedule ( 'daily-report' , async ( job , scope ) => {
const [ err , users ] = await scope . task (() => fetchActiveUsers ())
if ( err ) throw err
await Promise . all ( users . map ( user =>
scope . task (() => sendReport ( user ))
))
})
scheduler . onSchedule ( 'process-webhooks' , async ( job , scope ) => {
const [ err ] = await scope . task (() =>
fetch ( job . payload . url , {
method: 'POST' ,
body: JSON . stringify ( job . payload . data )
})
)
if ( err ) console . error ( `Webhook failed: ${ err . message } ` )
})
Start the scheduler
await scheduler . start ()
console . log ( 'Scheduler running!' )
console . log ( 'Web UI:' , scheduler . getWebUIUrl ())
CPU-Intensive Jobs with Worker Threads
For CPU-heavy tasks, run handlers in worker threads to avoid blocking the event loop.
import { Scheduler , InMemoryAdapter } from '@go-go-scope/scheduler'
const scheduler = new Scheduler ({
persistence: new InMemoryAdapter (),
pollInterval: 1000
})
// ML analysis job
scheduler . createSchedule ( 'ml-analysis' , {
interval: 60000 , // Every minute
data: {
model: 'recommendation-v2' ,
dataset: 'user-behavior'
}
})
// Run in worker thread with { worker: true }
scheduler . onSchedule ( 'ml-analysis' , async ( job ) => {
console . log ( `Starting ML analysis...` )
console . log ( `Model: ${ job . data . model } ` )
console . log ( `Dataset: ${ job . data . dataset } ` )
const start = Date . now ()
// Heavy computation (runs in worker thread)
function processData () {
let result = 0
for ( let i = 0 ; i < 10000000 ; i ++ ) {
result += Math . sin ( i ) * Math . cos ( i )
}
return result
}
const result = processData ()
const duration = Date . now () - start
console . log ( `ML analysis complete in ${ duration } ms` )
console . log ( `Result: ${ result . toFixed ( 4 ) } ` )
return { processed: 10000000 , duration }
}, { worker: true }) // ← Runs in worker thread!
// Data aggregation job (also in worker)
scheduler . createSchedule ( 'daily-report' , {
interval: 45000 ,
data: {
reportType: 'analytics' ,
metrics: [ 'views' , 'clicks' , 'conversions' ]
}
})
scheduler . onSchedule ( 'daily-report' , async ( job ) => {
console . log ( `Generating ${ job . data . reportType } report...` )
const start = Date . now ()
// Simulate report generation
function aggregateMetrics () {
const metrics = {}
for ( const metric of [ 'views' , 'clicks' , 'conversions' ]) {
let sum = 0
for ( let i = 0 ; i < 5000000 ; i ++ ) {
sum += Math . random ()
}
metrics [ metric ] = sum
}
return metrics
}
const metrics = aggregateMetrics ()
const duration = Date . now () - start
console . log ( `Report generated in ${ duration } ms` )
console . log ( 'Metrics:' , metrics )
return { metrics , generatedAt: new Date (). toISOString () }
}, { worker: true })
scheduler . start ()
Worker threads provide true parallelism for CPU-bound tasks. Main thread handlers are better for I/O operations.
Cron Schedules
Use cron expressions for complex scheduling patterns.
Common Patterns
Cron Presets
import { CronPresets } from '@go-go-scope/scheduler'
// Every day at midnight
scheduler . createSchedule ( 'cleanup' , {
cron: CronPresets . DAILY_MIDNIGHT
})
// Every Monday at 9 AM
scheduler . createSchedule ( 'weekly-report' , {
cron: '0 9 * * 1' ,
timezone: 'America/New_York'
})
// Every hour at minute 30
scheduler . createSchedule ( 'hourly-sync' , {
cron: '30 * * * *'
})
// First day of month at 2 AM
scheduler . createSchedule ( 'monthly-billing' , {
cron: '0 2 1 * *'
})
Persistence Backends
Choose a storage backend based on your infrastructure.
Redis
PostgreSQL
MySQL
In-Memory (Development)
import { RedisAdapter } from '@go-go-scope/persistence-redis'
import { Redis } from 'ioredis'
const redis = new Redis ( process . env . REDIS_URL )
const adapter = new RedisAdapter ( redis )
const scheduler = new Scheduler ({
persistence: adapter ,
pollInterval: 1000
})
Event Monitoring
Track job execution with event listeners.
scheduler . on ( 'job:started' , ( event ) => {
console . log ( `Job ${ event . jobId } started ( ${ event . scheduleName } )` )
})
scheduler . on ( 'job:completed' , ( event ) => {
console . log ( `Job ${ event . jobId } completed in ${ event . duration } ms` )
})
scheduler . on ( 'job:failed' , ( event ) => {
console . error ( `Job ${ event . jobId } failed:` , event . error . message )
// Send alert, log to monitoring service, etc.
sendAlert ({
type: 'job_failure' ,
jobId: event . jobId ,
schedule: event . scheduleName ,
error: event . error . message
})
})
scheduler . on ( 'schedule:created' , ( event ) => {
console . log ( `Schedule created: ${ event . scheduleName } ` )
})
scheduler . on ( 'schedule:deleted' , ( event ) => {
console . log ( `Schedule deleted: ${ event . scheduleName } ` )
})
Distributed High Availability
Run multiple scheduler instances for fault tolerance. They coordinate automatically via the shared storage backend.
// Instance 1 (server-1)
const scheduler1 = new Scheduler ({
storage: sharedStorage ,
pollInterval: 1000
})
await scheduler1 . start ()
// Instance 2 (server-2)
const scheduler2 = new Scheduler ({
storage: sharedStorage ,
pollInterval: 1000
})
await scheduler2 . start ()
// Instance 3 (server-3)
const scheduler3 = new Scheduler ({
storage: sharedStorage ,
pollInterval: 1000
})
await scheduler3 . start ()
Jobs are distributed across instances using distributed locks. If one instance fails, others continue processing.
Graceful Shutdown
Handle shutdown signals to allow in-flight jobs to complete.
import { Scheduler } from '@go-go-scope/scheduler'
const scheduler = new Scheduler ({ storage })
await scheduler . start ()
process . on ( 'SIGINT' , async () => {
console . log ( ' \n Shutting down scheduler...' )
// Stop accepting new jobs and wait for current jobs
await scheduler . stop ()
console . log ( 'Scheduler stopped gracefully' )
process . exit ( 0 )
})
process . on ( 'SIGTERM' , async () => {
console . log ( 'SIGTERM received, shutting down...' )
await scheduler . stop ()
process . exit ( 0 )
})
Web UI Management
Enable the built-in web UI to manage schedules visually.
const scheduler = new Scheduler ({
storage ,
enableWebUI: true ,
webUIPort: 3001 // Optional, defaults to 3000
})
await scheduler . start ()
console . log ( 'Web UI available at:' , scheduler . getWebUIUrl ())
// http://localhost:3001
The Web UI provides:
View all schedules
Create/edit/delete schedules
View job execution history
Monitor job status in real-time
Trigger manual job executions