Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/platforma-dev/platforma/llms.txt

Use this file to discover all available pages before exploring further.

Platforma’s queue system provides a robust way to process background jobs using typed handlers, worker pools, and graceful shutdown semantics.

Overview

The queue package provides three main abstractions:
  • Processor: Manages a pool of workers that process jobs
  • Handler: Defines how to process jobs of a specific type
  • Provider: Abstracts the underlying queue implementation (Redis, RabbitMQ, etc.)

Handler Interface

Implement the Handler[T] interface to define job processing logic:
type Handler[T any] interface {
	Handle(ctx context.Context, job T)
}
The handler is type-safe - you define the job type and the processor ensures type checking at compile time.

Creating a Handler

Using a Struct

type EmailJob struct {
	To      string
	Subject string
	Body    string
}

type EmailHandler struct {
	smtp *smtp.Client
}

func (h *EmailHandler) Handle(ctx context.Context, job EmailJob) {
	log.InfoContext(ctx, "sending email", "to", job.To)

	err := h.smtp.SendMail(job.To, job.Subject, job.Body)
	if err != nil {
		log.ErrorContext(ctx, "failed to send email", "error", err)
		return
	}

	log.InfoContext(ctx, "email sent successfully")
}

Using a Function

For simple handlers, use HandlerFunc[T]:
handler := queue.HandlerFunc[EmailJob](func(ctx context.Context, job EmailJob) {
	log.InfoContext(ctx, "processing email", "to", job.To)
	// Process the job...
})

Provider Interface

The Provider[T] interface abstracts the queue implementation:
type Provider[T any] interface {
	Open(ctx context.Context) error
	Close(ctx context.Context) error
	EnqueueJob(ctx context.Context, job T) error
	GetJobChan(ctx context.Context) (chan T, error)
}
You need to implement a provider for your chosen queue backend (Redis, RabbitMQ, in-memory, etc.).

Creating a Processor

Create a processor with your handler, provider, and configuration:
import (
	"time"
	"github.com/platforma-dev/platforma/queue"
)

handler := &EmailHandler{smtp: smtpClient}
provider := NewRedisProvider[EmailJob](redisClient)

processor := queue.New(
	handler,
	provider,
	10,                  // Number of workers
	30*time.Second,      // Shutdown timeout
)

Enqueuing Jobs

Add jobs to the queue using the Enqueue method:
job := EmailJob{
	To:      "user@example.com",
	Subject: "Welcome!",
	Body:    "Thanks for signing up.",
}

err := processor.Enqueue(ctx, job)
if err != nil {
	log.ErrorContext(ctx, "failed to enqueue job", "error", err)
	return err
}
The Enqueue method is type-safe - the compiler ensures you only enqueue jobs matching the processor’s type.

Running the Processor

Start the processor to begin processing jobs:
err := processor.Run(ctx)
if err != nil {
	return fmt.Errorf("processor failed: %w", err)
}
The Run method:
  1. Opens the queue provider connection
  2. Starts the configured number of worker goroutines
  3. Blocks until the context is cancelled
  4. Performs graceful shutdown
  5. Closes the provider connection

Worker Pools

The processor manages a pool of concurrent workers. Each worker:
  • Runs in its own goroutine
  • Has a unique worker ID in the context (log.WorkerIDKey)
  • Processes jobs from a shared channel
  • Recovers from panics and logs them
  • Participates in graceful shutdown
processor := queue.New(
	handler,
	provider,
	5,  // 5 concurrent workers
	30*time.Second,
)

Graceful Shutdown

When the context is cancelled, the processor performs graceful shutdown:
1

Stop accepting new jobs

Workers check ctx.Done() before accepting new jobs from the channel.
2

Drain remaining jobs

Workers continue processing jobs already in the channel until the shutdown timeout expires.
processor := queue.New(
  handler,
  provider,
  10,
  30*time.Second,  // Workers have 30s to finish remaining jobs
)
3

Log shutdown progress

Workers log when they detect shutdown, when they finish draining, and when timeout expires:
shutting down worker
shutdown timeout expired
worker finished
all workers shut down

Context-Aware Logging

Each worker has a unique worker ID injected into the context:
workerCtx := context.WithValue(ctx, log.WorkerIDKey, uuid.NewString())
This allows you to trace job processing across logs:
func (h *EmailHandler) Handle(ctx context.Context, job EmailJob) {
	log.InfoContext(ctx, "starting email send")  // Includes workerId
	// Process job...
	log.InfoContext(ctx, "email sent")  // Same workerId
}

Complete Example

package main

import (
	"context"
	"github.com/platforma-dev/platforma/log"
)

type NotificationJob struct {
	UserID  string
	Message string
}

type NotificationHandler struct {
	notifier *Notifier
}

func (h *NotificationHandler) Handle(ctx context.Context, job NotificationJob) {
	log.InfoContext(ctx, "processing notification",
		"user_id", job.UserID,
	)

	err := h.notifier.Send(ctx, job.UserID, job.Message)
	if err != nil {
		log.ErrorContext(ctx, "notification failed", "error", err)
		return
	}

	log.InfoContext(ctx, "notification sent successfully")
}

Error Handling

Handlers should handle errors internally - the Handle method doesn’t return an error. Log errors using the context-aware logger:
func (h *Handler) Handle(ctx context.Context, job MyJob) {
	err := h.process(job)
	if err != nil {
		log.ErrorContext(ctx, "job failed", "error", err, "job_id", job.ID)
		// Optionally: re-enqueue, send to DLQ, etc.
		return
	}
	log.InfoContext(ctx, "job completed")
}

Best Practices

Make handlers idempotent

Jobs may be processed more than once due to failures. Design handlers to be safe when re-run.

Use appropriate worker counts

Match worker count to job characteristics: CPU-bound (1x cores), I/O-bound (10-100+).

Set reasonable timeouts

Shutdown timeout should exceed typical job duration to avoid cutting off work.

Log liberally with context

Use InfoContext and ErrorContext to maintain correlation via worker IDs.

Build docs developers (and LLMs) love