Documentation Index
Fetch the complete documentation index at: https://mintlify.com/platforma-dev/platforma/llms.txt
Use this file to discover all available pages before exploring further.
Platforma’s queue system provides a robust way to process background jobs using typed handlers, worker pools, and graceful shutdown semantics.
Overview
The queue package provides three main abstractions:
- Processor: Manages a pool of workers that process jobs
- Handler: Defines how to process jobs of a specific type
- Provider: Abstracts the underlying queue implementation (Redis, RabbitMQ, etc.)
Handler Interface
Implement the Handler[T] interface to define job processing logic:
type Handler[T any] interface {
Handle(ctx context.Context, job T)
}
The handler is type-safe - you define the job type and the processor ensures type checking at compile time.
Creating a Handler
Using a Struct
type EmailJob struct {
To string
Subject string
Body string
}
type EmailHandler struct {
smtp *smtp.Client
}
func (h *EmailHandler) Handle(ctx context.Context, job EmailJob) {
log.InfoContext(ctx, "sending email", "to", job.To)
err := h.smtp.SendMail(job.To, job.Subject, job.Body)
if err != nil {
log.ErrorContext(ctx, "failed to send email", "error", err)
return
}
log.InfoContext(ctx, "email sent successfully")
}
Using a Function
For simple handlers, use HandlerFunc[T]:
handler := queue.HandlerFunc[EmailJob](func(ctx context.Context, job EmailJob) {
log.InfoContext(ctx, "processing email", "to", job.To)
// Process the job...
})
Provider Interface
The Provider[T] interface abstracts the queue implementation:
type Provider[T any] interface {
Open(ctx context.Context) error
Close(ctx context.Context) error
EnqueueJob(ctx context.Context, job T) error
GetJobChan(ctx context.Context) (chan T, error)
}
You need to implement a provider for your chosen queue backend (Redis, RabbitMQ, in-memory, etc.).
Creating a Processor
Create a processor with your handler, provider, and configuration:
import (
"time"
"github.com/platforma-dev/platforma/queue"
)
handler := &EmailHandler{smtp: smtpClient}
provider := NewRedisProvider[EmailJob](redisClient)
processor := queue.New(
handler,
provider,
10, // Number of workers
30*time.Second, // Shutdown timeout
)
Enqueuing Jobs
Add jobs to the queue using the Enqueue method:
job := EmailJob{
To: "user@example.com",
Subject: "Welcome!",
Body: "Thanks for signing up.",
}
err := processor.Enqueue(ctx, job)
if err != nil {
log.ErrorContext(ctx, "failed to enqueue job", "error", err)
return err
}
The Enqueue method is type-safe - the compiler ensures you only enqueue jobs matching the processor’s type.
Running the Processor
Start the processor to begin processing jobs:
err := processor.Run(ctx)
if err != nil {
return fmt.Errorf("processor failed: %w", err)
}
The Run method:
- Opens the queue provider connection
- Starts the configured number of worker goroutines
- Blocks until the context is cancelled
- Performs graceful shutdown
- Closes the provider connection
Worker Pools
The processor manages a pool of concurrent workers. Each worker:
- Runs in its own goroutine
- Has a unique worker ID in the context (
log.WorkerIDKey)
- Processes jobs from a shared channel
- Recovers from panics and logs them
- Participates in graceful shutdown
processor := queue.New(
handler,
provider,
5, // 5 concurrent workers
30*time.Second,
)
Graceful Shutdown
When the context is cancelled, the processor performs graceful shutdown:
Stop accepting new jobs
Workers check ctx.Done() before accepting new jobs from the channel.
Drain remaining jobs
Workers continue processing jobs already in the channel until the shutdown timeout expires.processor := queue.New(
handler,
provider,
10,
30*time.Second, // Workers have 30s to finish remaining jobs
)
Log shutdown progress
Workers log when they detect shutdown, when they finish draining, and when timeout expires:shutting down worker
shutdown timeout expired
worker finished
all workers shut down
Context-Aware Logging
Each worker has a unique worker ID injected into the context:
workerCtx := context.WithValue(ctx, log.WorkerIDKey, uuid.NewString())
This allows you to trace job processing across logs:
func (h *EmailHandler) Handle(ctx context.Context, job EmailJob) {
log.InfoContext(ctx, "starting email send") // Includes workerId
// Process job...
log.InfoContext(ctx, "email sent") // Same workerId
}
Complete Example
package main
import (
"context"
"github.com/platforma-dev/platforma/log"
)
type NotificationJob struct {
UserID string
Message string
}
type NotificationHandler struct {
notifier *Notifier
}
func (h *NotificationHandler) Handle(ctx context.Context, job NotificationJob) {
log.InfoContext(ctx, "processing notification",
"user_id", job.UserID,
)
err := h.notifier.Send(ctx, job.UserID, job.Message)
if err != nil {
log.ErrorContext(ctx, "notification failed", "error", err)
return
}
log.InfoContext(ctx, "notification sent successfully")
}
Error Handling
Handlers should handle errors internally - the Handle method doesn’t return an error. Log errors using the context-aware logger:
func (h *Handler) Handle(ctx context.Context, job MyJob) {
err := h.process(job)
if err != nil {
log.ErrorContext(ctx, "job failed", "error", err, "job_id", job.ID)
// Optionally: re-enqueue, send to DLQ, etc.
return
}
log.InfoContext(ctx, "job completed")
}
Best Practices
Make handlers idempotent
Jobs may be processed more than once due to failures. Design handlers to be safe when re-run.
Use appropriate worker counts
Match worker count to job characteristics: CPU-bound (1x cores), I/O-bound (10-100+).
Set reasonable timeouts
Shutdown timeout should exceed typical job duration to avoid cutting off work.
Log liberally with context
Use InfoContext and ErrorContext to maintain correlation via worker IDs.