@go-go-scope/scheduler
Distributed job scheduler for go-go-scope with cron expression support, Web UI for management, and multiple storage backends for high availability deployments.
Installation
npm install @go-go-scope/scheduler
Quick Start
import { Scheduler } from '@go-go-scope/scheduler'
import { scope } from 'go-go-scope'
type AppSchedules = {
'send-email': { to: string; subject: string; body: string }
'daily-report': { date: string }
}
await using s = scope()
const scheduler = new Scheduler<AppSchedules>({
scope: s,
enableWebUI: true,
webUIPort: 8080
})
// Create a schedule
await scheduler.createSchedule('daily-report', {
cron: '0 9 * * *',
timezone: 'America/New_York'
})
// Register handler
scheduler.onSchedule('daily-report', async (job, scope) => {
console.log('Generating report for:', job.payload.date)
// Generate report logic
})
scheduler.start()
console.log('Web UI:', scheduler.getWebUIUrl())
Architecture
The scheduler uses a mandatory Admin + Workers pattern:
- Admin Instance: Creates and manages schedules, serves Web UI
- Worker Instances: Load schedules from storage and execute jobs
- Storage Layer: Handles recurring scheduling automatically via
completeJobAndScheduleNext()
This design enables:
- Multiple admin instances for HA (no leader election needed)
- Multiple worker instances for distributed execution
- Automatic job scheduling via database
- Distributed locking prevents duplicate execution
Scheduler Class
Constructor
const scheduler = new Scheduler<Schedules>(options)
Scheduler configuration options
SchedulerOptions
Parent scope for structured concurrency. If not provided, an internal scope is created.
Storage backend for jobs and schedules. Defaults to InMemoryJobStorage.
Polling interval in milliseconds (default: 1000)
Enable metrics collection (default: false)
Enable Web UI server (default: false)
Web UI port (default: 8080)
Web UI host (default: ‘0.0.0.0’)
Web UI base path (default: ’/’)
Optional API key for Web UI authentication
Auto-start scheduler on creation (default: true)
Time in ms before job is considered deadlocked (optional)
Enable leader election for admin instances (default: false)
workerPool
{ size?: number; idleTimeout?: number }
Worker pool configuration for CPU-intensive schedules
Methods
start()
Start the scheduler polling loop.
Workers will start checking for due jobs. Admin can create schedules after starting.
stop()
Stop the scheduler and cancel all running jobs.
createSchedule()
Create a new schedule (admin operation).
await scheduler.createSchedule('schedule-name', {
cron: '0 * * * *',
timezone: 'America/New_York',
maxRetries: 3
})
options
CreateScheduleOptions
required
Schedule configuration
CreateScheduleOptions
Cron expression (e.g., ‘0 9 * * *’)
Interval in milliseconds (alternative to cron)
IANA timezone (e.g., ‘America/New_York’)
End date - no new jobs after this date
Maximum retry attempts (default: 3)
Delay between retries in ms (default: 1000)
Job timeout in ms (default: 30000)
Allow concurrent execution (default: false)
Random jitter in ms to prevent thundering herd (default: 0)
onSchedule()
Register handler for a schedule (worker operation).
scheduler.onSchedule('schedule-name', async (job, scope) => {
// Job handler logic
console.log('Processing:', job.payload)
}, { worker: false })
Handler function for jobs
ScheduleHandler
type ScheduleHandler = (
job: Job,
scope: Scope
) => Promise<void>
Scope for the job execution
triggerSchedule()
Manually trigger a schedule to create a job.
await scheduler.triggerSchedule('send-email', {
to: '[email protected]',
subject: 'Hello',
body: 'World'
})
Job payload (typed based on schedule definition)
Result containing job ID and run time
updateSchedule()
Update an existing schedule.
await scheduler.updateSchedule('schedule-name', {
cron: '0 10 * * *',
maxRetries: 5
})
options
UpdateScheduleOptions
required
Options to update
deleteSchedule()
Delete a schedule and its pending jobs.
await scheduler.deleteSchedule('schedule-name')
pauseSchedule()
Pause a schedule (no new jobs created).
await scheduler.pauseSchedule('schedule-name')
resumeSchedule()
Resume a paused schedule.
await scheduler.resumeSchedule('schedule-name')
getScheduleStats()
Get statistics for a schedule.
const stats = await scheduler.getScheduleStats('schedule-name')
console.log('Success rate:', stats.successRate)
Statistics including success rate, job counts, and timing info
getWebUIUrl()
Get the Web UI URL if enabled.
const url = scheduler.getWebUIUrl()
console.log('Web UI:', url) // http://localhost:8080/
Web UI URL or null if not enabled
Events
on()
Subscribe to scheduler events.
const unsubscribe = scheduler.on('jobCompleted', ({ job }) => {
console.log('Job completed:', job.id)
})
// Later: unsubscribe()
Available Events
Job completed successfully
jobFailed
{ job: Job; error: Error }
Job failed
jobRetrying
{ job: Job; attempt: number }
Job is retrying
Instance became leader (HA mode)
Storage Backends
InMemoryJobStorage
Simple in-memory storage for single-node deployments.
import { InMemoryJobStorage } from '@go-go-scope/scheduler'
const scheduler = new Scheduler({
storage: new InMemoryJobStorage()
})
RedisJobStorage
Redis-backed storage for distributed deployments.
import { RedisJobStorage } from '@go-go-scope/scheduler'
import { RedisAdapter } from '@go-go-scope/persistence-redis'
import Redis from 'ioredis'
const redis = new Redis()
const adapter = new RedisAdapter(redis)
const storage = new RedisJobStorage(redis, adapter)
const scheduler = new Scheduler({ storage })
SQLJobStorage
SQL database storage (PostgreSQL, MySQL, SQLite).
import { SQLJobStorage } from '@go-go-scope/scheduler'
import { PostgresAdapter } from '@go-go-scope/persistence-postgres'
import pg from 'pg'
const pool = new pg.Pool({ connectionString: 'postgresql://...' })
const adapter = new PostgresAdapter(pool)
const storage = new SQLJobStorage(adapter)
const scheduler = new Scheduler({ storage })
Cron Support
parseCron()
Parse a cron expression.
import { parseCron } from '@go-go-scope/scheduler'
const cron = parseCron('0 9 * * *', 'America/New_York')
const next = cron.next() // Next occurrence
Parsed cron expression with .next() method
CronPresets
Common cron presets.
import { CronPresets } from '@go-go-scope/scheduler'
await scheduler.createSchedule('hourly-task', {
cron: CronPresets.HOURLY // '0 * * * *'
})
Available presets:
EVERY_MINUTE: '* * * * *'
HOURLY: '0 * * * *'
DAILY: '0 0 * * *'
WEEKLY: '0 0 * * 0'
MONTHLY: '0 0 1 * *'
YEARLY: '0 0 1 1 *'
describeCron()
Get human-readable description of cron expression.
import { describeCron } from '@go-go-scope/scheduler'
const description = describeCron('0 9 * * 1-5')
// "At 9:00 AM, Monday through Friday"
Type Safety
Define schedule types for autocomplete and type checking:
type AppSchedules = {
'send-email': {
to: string
subject: string
body: string
}
'process-payment': {
amount: number
currency: string
}
'cleanup-temp-files': {
maxAge: number
}
}
const scheduler = new Scheduler<AppSchedules>({ storage })
// Autocomplete for schedule names!
scheduler.onSchedule('send-email', async (job) => {
// job.payload is fully typed
const { to, subject, body } = job.payload
await sendEmail({ to, subject, body })
})
// Type checking works!
await scheduler.triggerSchedule('send-email', {
to: '[email protected]',
subject: 'Hello',
body: 'World'
})
Job Object
Reference to parent schedule
Current status: ‘pending’ | ‘running’ | ‘completed’ | ‘failed’ | ‘cancelled’
Priority (higher = runs first)
Examples
Email Scheduler
type EmailSchedules = {
'welcome-email': { userId: string }
'daily-digest': { emails: string[] }
}
const scheduler = new Scheduler<EmailSchedules>({
storage: new RedisJobStorage(redis, redisAdapter)
})
// Daily digest at 8 AM
await scheduler.createSchedule('daily-digest', {
cron: '0 8 * * *',
timezone: 'America/New_York'
})
scheduler.onSchedule('daily-digest', async (job) => {
const { emails } = job.payload
for (const email of emails) {
await sendDigest(email)
}
})
Retry with Backoff
await scheduler.createSchedule('api-sync', {
interval: 60000, // 1 minute
maxRetries: 5,
retryDelay: 5000, // 5 seconds
timeout: 30000 // 30 second timeout
})
scheduler.onSchedule('api-sync', async (job, scope) => {
const response = await fetch('https://api.example.com/data', {
signal: scope.signal
})
const data = await response.json()
await saveToDatabase(data)
})
Multi-Instance HA Setup
// Admin instance (creates schedules, serves Web UI)
const admin = new Scheduler({
storage: sqlStorage,
enableWebUI: true,
webUIPort: 8080,
enableLeaderElection: true
})
await admin.createSchedule('backup', {
cron: '0 2 * * *',
timezone: 'UTC'
})
// Worker instance 1
const worker1 = new Scheduler({ storage: sqlStorage })
worker1.onSchedule('backup', async (job) => {
await performBackup()
})
// Worker instance 2
const worker2 = new Scheduler({ storage: sqlStorage })
worker2.onSchedule('backup', async (job) => {
await performBackup()
})
// Only one worker will execute each job (distributed locking)
CPU-Intensive Schedules
const scheduler = new Scheduler({
storage,
workerPool: { size: 4, idleTimeout: 30000 }
})
scheduler.onSchedule('heavy-computation', async (job) => {
// Runs in worker thread
const result = heavyCalculation(job.payload.data)
await saveResult(result)
}, { worker: true })