Documentation Index
Fetch the complete documentation index at: https://mintlify.com/platforma-dev/platforma/llms.txt
Use this file to discover all available pages before exploring further.
The queue package provides a flexible job processing system with typed job handlers, worker pools, and graceful shutdown.
Processor Type
The Processor[T] type manages a pool of workers to process jobs from a queue.
type Processor[T any] struct {
handler Handler[T]
queue Provider[T]
wg sync.WaitGroup
workersAmount int
shutdownTimeout time.Duration
}
Constructor
New[T](handler, queue, workersAmount, shutdownTimeout)
Creates a new Processor with the specified configuration.Parameters:
handler (Handler[T]): The handler to process jobs
queue (Provider[T]): The queue provider implementation
workersAmount (int): Number of concurrent workers
shutdownTimeout (time.Duration): Time to wait for in-progress jobs during shutdown
processor := queue.New(
queue.HandlerFunc[Job](handleJob),
queue.NewChanQueue[Job](100, 3*time.Second),
5, // 5 workers
10*time.Second, // 10s shutdown timeout
)
Processor Methods
Enqueue
Enqueue(ctx context.Context, job T)
Adds a job to the queue for processing.err := processor.Enqueue(ctx, Job{UserID: 123, Action: "send_email"})
if err != nil {
log.ErrorContext(ctx, "failed to enqueue", "error", err)
}
Run
Starts the queue processor and blocks until all workers complete.Lifecycle:
- Opens the queue provider
- Starts the configured number of workers
- Workers process jobs until context is canceled
- Attempts to drain remaining jobs within shutdown timeout
- Closes the queue provider
if err := processor.Run(ctx); err != nil {
log.ErrorContext(ctx, "processor error", "error", err)
}
Handler Interface
The Handler interface defines how jobs are processed.
type Handler[T any] interface {
Handle(ctx context.Context, job T)
}
Handle
func(ctx context.Context, job T)
Processes a single job. This method is called by workers for each job in the queue.
HandlerFunc
A function type that implements the Handler interface.
type HandlerFunc[T any] func(ctx context.Context, job T)
func (f HandlerFunc[T]) Handle(ctx context.Context, job T) {
f(ctx, job)
}
Provider Interface
The Provider interface defines queue implementations.
type Provider[T any] interface {
Open(ctx context.Context) error
Close(ctx context.Context) error
EnqueueJob(ctx context.Context, job T) error
GetJobChan(ctx context.Context) (chan T, error)
}
Open
func(ctx context.Context) error
Initializes the queue provider (e.g., connects to Redis, opens channels).
Close
func(ctx context.Context) error
Closes the queue provider and cleans up resources.
EnqueueJob
func(ctx context.Context, job T) error
Adds a job to the queue.
GetJobChan
func(ctx context.Context) (chan T, error)
Returns a channel that workers can receive jobs from.
Built-in Queue Provider
ChanQueue
Platforma provides a channel-based queue implementation for simple use cases:
func NewChanQueue[T any](capacity int, shutdownTimeout time.Duration) Provider[T]
Example:
queue := queue.NewChanQueue[Job](100, 3*time.Second)
Example Usage
Basic Job Processing
package main
import (
"context"
"time"
"github.com/platforma-dev/platforma/log"
"github.com/platforma-dev/platforma/queue"
)
type EmailJob struct {
To string
Subject string
Body string
}
func handleEmail(ctx context.Context, job EmailJob) {
log.InfoContext(ctx, "sending email",
"to", job.To,
"subject", job.Subject,
)
// Send email logic here
time.Sleep(100 * time.Millisecond) // Simulate work
log.InfoContext(ctx, "email sent", "to", job.To)
}
func main() {
ctx := context.Background()
// Create queue and processor
q := queue.NewChanQueue[EmailJob](100, 3*time.Second)
processor := queue.New(
queue.HandlerFunc[EmailJob](handleEmail),
q,
5, // 5 workers
10*time.Second, // 10s shutdown timeout
)
// Start processor in background
go processor.Run(ctx)
time.Sleep(time.Millisecond) // Allow processor to start
// Enqueue jobs
processor.Enqueue(ctx, EmailJob{
To: "user@example.com",
Subject: "Welcome!",
Body: "Thanks for signing up",
})
processor.Enqueue(ctx, EmailJob{
To: "admin@example.com",
Subject: "New User",
Body: "A new user signed up",
})
time.Sleep(time.Second) // Wait for jobs to process
}
Struct Handler
type EmailService struct {
smtpHost string
smtpPort int
}
func (s *EmailService) Handle(ctx context.Context, job EmailJob) {
log.InfoContext(ctx, "sending via SMTP",
"host", s.smtpHost,
"to", job.To,
)
// SMTP sending logic
}
// Usage
emailService := &EmailService{
smtpHost: "smtp.example.com",
smtpPort: 587,
}
processor := queue.New(
emailService,
queue.NewChanQueue[EmailJob](100, 3*time.Second),
3,
5*time.Second,
)
Integration with Application
package main
import (
"context"
"time"
"github.com/platforma-dev/platforma/application"
"github.com/platforma-dev/platforma/queue"
"github.com/platforma-dev/platforma/log"
)
type Job struct {
Data int
}
func jobHandler(ctx context.Context, job Job) {
log.InfoContext(ctx, "processing job", "data", job.Data)
}
func main() {
ctx := context.Background()
app := application.New()
// Create processor
q := queue.NewChanQueue[Job](50, 2*time.Second)
processor := queue.New(
queue.HandlerFunc[Job](jobHandler),
q,
3,
5*time.Second,
)
// Register as service
app.RegisterService("job_processor", processor)
// Enqueue jobs during startup
app.OnStartFunc(func(ctx context.Context) error {
for i := 1; i <= 10; i++ {
if err := processor.Enqueue(ctx, Job{Data: i}); err != nil {
return err
}
}
log.InfoContext(ctx, "enqueued 10 jobs")
return nil
}, application.StartupTaskConfig{
Name: "enqueue_initial_jobs",
AbortOnError: false,
})
if err := app.Run(ctx); err != nil {
log.ErrorContext(ctx, "app error", "error", err)
}
}
Graceful Shutdown
The processor handles graceful shutdown automatically:
- Context Cancellation: Workers stop accepting new jobs
- Drain Period: Workers attempt to finish in-progress jobs within the shutdown timeout
- Cleanup: Queue provider is closed
ctx, cancel := context.WithCancel(context.Background())
// Start processor
go processor.Run(ctx)
// Later: trigger shutdown
cancel()
// Workers will:
// 1. Stop accepting new jobs immediately
// 2. Try to complete current jobs within shutdownTimeout
// 3. Exit gracefully
Context Keys
The processor automatically adds a WorkerIDKey to the context for each worker:
func handleJob(ctx context.Context, job Job) {
workerID := ctx.Value(log.WorkerIDKey).(string)
log.InfoContext(ctx, "processing", "worker", workerID)
}