Skip to main content

Background Job Processing

The @go-go-scope/scheduler package provides a distributed job scheduler with cron support, worker threads for CPU-intensive tasks, and multiple persistence backends.

Quick Start

1

Install the scheduler

npm install @go-go-scope/scheduler
npm install @go-go-scope/persistence-redis  # or postgres, mysql, etc.
2

Create a scheduler instance

import { scope } from 'go-go-scope'
import { Scheduler, SQLJobStorage } from '@go-go-scope/scheduler'
import { PostgresAdapter } from '@go-go-scope/persistence-postgres'

await using s = scope()

const storage = new SQLJobStorage(
  'postgresql://localhost:5432/jobs',
  new PostgresAdapter()
)

const scheduler = new Scheduler({
  scope: s,
  storage,
  enableWebUI: true  // Optional management UI
})
3

Define schedules

// Daily report at 9 AM
await scheduler.createSchedule('daily-report', {
  cron: '0 9 * * *',
  timezone: 'America/New_York',
  concurrent: false  // One at a time
})

// Process webhooks every 5 seconds
await scheduler.createSchedule('process-webhooks', {
  interval: 5000,
  concurrent: true  // Multiple in parallel
})
4

Register handlers

scheduler.onSchedule('daily-report', async (job, scope) => {
  const [err, users] = await scope.task(() => fetchActiveUsers())
  if (err) throw err

  await Promise.all(users.map(user => 
    scope.task(() => sendReport(user))
  ))
})

scheduler.onSchedule('process-webhooks', async (job, scope) => {
  const [err] = await scope.task(() => 
    fetch(job.payload.url, { 
      method: 'POST', 
      body: JSON.stringify(job.payload.data) 
    })
  )
  if (err) console.error(`Webhook failed: ${err.message}`)
})
5

Start the scheduler

await scheduler.start()
console.log('Scheduler running!')
console.log('Web UI:', scheduler.getWebUIUrl())

CPU-Intensive Jobs with Worker Threads

For CPU-heavy tasks, run handlers in worker threads to avoid blocking the event loop.
import { Scheduler, InMemoryAdapter } from '@go-go-scope/scheduler'

const scheduler = new Scheduler({
  persistence: new InMemoryAdapter(),
  pollInterval: 1000
})

// ML analysis job
scheduler.createSchedule('ml-analysis', {
  interval: 60000,  // Every minute
  data: { 
    model: 'recommendation-v2',
    dataset: 'user-behavior'
  }
})

// Run in worker thread with { worker: true }
scheduler.onSchedule('ml-analysis', async (job) => {
  console.log(`Starting ML analysis...`)
  console.log(`Model: ${job.data.model}`)
  console.log(`Dataset: ${job.data.dataset}`)

  const start = Date.now()

  // Heavy computation (runs in worker thread)
  function processData() {
    let result = 0
    for (let i = 0; i < 10000000; i++) {
      result += Math.sin(i) * Math.cos(i)
    }
    return result
  }

  const result = processData()
  const duration = Date.now() - start

  console.log(`ML analysis complete in ${duration}ms`)
  console.log(`Result: ${result.toFixed(4)}`)

  return { processed: 10000000, duration }
}, { worker: true })  // ← Runs in worker thread!

// Data aggregation job (also in worker)
scheduler.createSchedule('daily-report', {
  interval: 45000,
  data: {
    reportType: 'analytics',
    metrics: ['views', 'clicks', 'conversions']
  }
})

scheduler.onSchedule('daily-report', async (job) => {
  console.log(`Generating ${job.data.reportType} report...`)

  const start = Date.now()

  // Simulate report generation
  function aggregateMetrics() {
    const metrics = {}
    for (const metric of ['views', 'clicks', 'conversions']) {
      let sum = 0
      for (let i = 0; i < 5000000; i++) {
        sum += Math.random()
      }
      metrics[metric] = sum
    }
    return metrics
  }

  const metrics = aggregateMetrics()
  const duration = Date.now() - start

  console.log(`Report generated in ${duration}ms`)
  console.log('Metrics:', metrics)

  return { metrics, generatedAt: new Date().toISOString() }
}, { worker: true })

scheduler.start()
Worker threads provide true parallelism for CPU-bound tasks. Main thread handlers are better for I/O operations.

Cron Schedules

Use cron expressions for complex scheduling patterns.
import { CronPresets } from '@go-go-scope/scheduler'

// Every day at midnight
scheduler.createSchedule('cleanup', {
  cron: CronPresets.DAILY_MIDNIGHT
})

// Every Monday at 9 AM
scheduler.createSchedule('weekly-report', {
  cron: '0 9 * * 1',
  timezone: 'America/New_York'
})

// Every hour at minute 30
scheduler.createSchedule('hourly-sync', {
  cron: '30 * * * *'
})

// First day of month at 2 AM
scheduler.createSchedule('monthly-billing', {
  cron: '0 2 1 * *'
})

Persistence Backends

Choose a storage backend based on your infrastructure.
import { RedisAdapter } from '@go-go-scope/persistence-redis'
import { Redis } from 'ioredis'

const redis = new Redis(process.env.REDIS_URL)
const adapter = new RedisAdapter(redis)

const scheduler = new Scheduler({
  persistence: adapter,
  pollInterval: 1000
})

Event Monitoring

Track job execution with event listeners.
scheduler.on('job:started', (event) => {
  console.log(`Job ${event.jobId} started (${event.scheduleName})`)
})

scheduler.on('job:completed', (event) => {
  console.log(`Job ${event.jobId} completed in ${event.duration}ms`)
})

scheduler.on('job:failed', (event) => {
  console.error(`Job ${event.jobId} failed:`, event.error.message)
  
  // Send alert, log to monitoring service, etc.
  sendAlert({
    type: 'job_failure',
    jobId: event.jobId,
    schedule: event.scheduleName,
    error: event.error.message
  })
})

scheduler.on('schedule:created', (event) => {
  console.log(`Schedule created: ${event.scheduleName}`)
})

scheduler.on('schedule:deleted', (event) => {
  console.log(`Schedule deleted: ${event.scheduleName}`)
})

Distributed High Availability

Run multiple scheduler instances for fault tolerance. They coordinate automatically via the shared storage backend.
// Instance 1 (server-1)
const scheduler1 = new Scheduler({
  storage: sharedStorage,
  pollInterval: 1000
})
await scheduler1.start()

// Instance 2 (server-2)
const scheduler2 = new Scheduler({
  storage: sharedStorage,
  pollInterval: 1000
})
await scheduler2.start()

// Instance 3 (server-3)
const scheduler3 = new Scheduler({
  storage: sharedStorage,
  pollInterval: 1000
})
await scheduler3.start()
Jobs are distributed across instances using distributed locks. If one instance fails, others continue processing.

Graceful Shutdown

Handle shutdown signals to allow in-flight jobs to complete.
import { Scheduler } from '@go-go-scope/scheduler'

const scheduler = new Scheduler({ storage })
await scheduler.start()

process.on('SIGINT', async () => {
  console.log('\nShutting down scheduler...')
  
  // Stop accepting new jobs and wait for current jobs
  await scheduler.stop()
  
  console.log('Scheduler stopped gracefully')
  process.exit(0)
})

process.on('SIGTERM', async () => {
  console.log('SIGTERM received, shutting down...')
  await scheduler.stop()
  process.exit(0)
})

Web UI Management

Enable the built-in web UI to manage schedules visually.
const scheduler = new Scheduler({
  storage,
  enableWebUI: true,
  webUIPort: 3001  // Optional, defaults to 3000
})

await scheduler.start()

console.log('Web UI available at:', scheduler.getWebUIUrl())
// http://localhost:3001
The Web UI provides:
  • View all schedules
  • Create/edit/delete schedules
  • View job execution history
  • Monitor job status in real-time
  • Trigger manual job executions

Build docs developers (and LLMs) love