Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/platforma-dev/platforma/llms.txt

Use this file to discover all available pages before exploring further.

The queue package provides a flexible job processing system with typed job handlers, worker pools, and graceful shutdown.

Processor Type

The Processor[T] type manages a pool of workers to process jobs from a queue.
type Processor[T any] struct {
    handler         Handler[T]
    queue           Provider[T]
    wg              sync.WaitGroup
    workersAmount   int
    shutdownTimeout time.Duration
}

Constructor

New[T](handler, queue, workersAmount, shutdownTimeout)
*Processor[T]
Creates a new Processor with the specified configuration.Parameters:
  • handler (Handler[T]): The handler to process jobs
  • queue (Provider[T]): The queue provider implementation
  • workersAmount (int): Number of concurrent workers
  • shutdownTimeout (time.Duration): Time to wait for in-progress jobs during shutdown
processor := queue.New(
    queue.HandlerFunc[Job](handleJob),
    queue.NewChanQueue[Job](100, 3*time.Second),
    5,                  // 5 workers
    10*time.Second,     // 10s shutdown timeout
)

Processor Methods

Enqueue

Enqueue(ctx context.Context, job T)
error
Adds a job to the queue for processing.
err := processor.Enqueue(ctx, Job{UserID: 123, Action: "send_email"})
if err != nil {
    log.ErrorContext(ctx, "failed to enqueue", "error", err)
}

Run

Run(ctx context.Context)
error
Starts the queue processor and blocks until all workers complete.Lifecycle:
  1. Opens the queue provider
  2. Starts the configured number of workers
  3. Workers process jobs until context is canceled
  4. Attempts to drain remaining jobs within shutdown timeout
  5. Closes the queue provider
if err := processor.Run(ctx); err != nil {
    log.ErrorContext(ctx, "processor error", "error", err)
}

Handler Interface

The Handler interface defines how jobs are processed.
type Handler[T any] interface {
    Handle(ctx context.Context, job T)
}
Handle
func(ctx context.Context, job T)
Processes a single job. This method is called by workers for each job in the queue.

HandlerFunc

A function type that implements the Handler interface.
type HandlerFunc[T any] func(ctx context.Context, job T)

func (f HandlerFunc[T]) Handle(ctx context.Context, job T) {
    f(ctx, job)
}

Provider Interface

The Provider interface defines queue implementations.
type Provider[T any] interface {
    Open(ctx context.Context) error
    Close(ctx context.Context) error
    EnqueueJob(ctx context.Context, job T) error
    GetJobChan(ctx context.Context) (chan T, error)
}
Open
func(ctx context.Context) error
Initializes the queue provider (e.g., connects to Redis, opens channels).
Close
func(ctx context.Context) error
Closes the queue provider and cleans up resources.
EnqueueJob
func(ctx context.Context, job T) error
Adds a job to the queue.
GetJobChan
func(ctx context.Context) (chan T, error)
Returns a channel that workers can receive jobs from.

Built-in Queue Provider

ChanQueue

Platforma provides a channel-based queue implementation for simple use cases:
func NewChanQueue[T any](capacity int, shutdownTimeout time.Duration) Provider[T]
Example:
queue := queue.NewChanQueue[Job](100, 3*time.Second)

Example Usage

Basic Job Processing

package main

import (
    "context"
    "time"
    "github.com/platforma-dev/platforma/log"
    "github.com/platforma-dev/platforma/queue"
)

type EmailJob struct {
    To      string
    Subject string
    Body    string
}

func handleEmail(ctx context.Context, job EmailJob) {
    log.InfoContext(ctx, "sending email",
        "to", job.To,
        "subject", job.Subject,
    )
    
    // Send email logic here
    time.Sleep(100 * time.Millisecond) // Simulate work
    
    log.InfoContext(ctx, "email sent", "to", job.To)
}

func main() {
    ctx := context.Background()
    
    // Create queue and processor
    q := queue.NewChanQueue[EmailJob](100, 3*time.Second)
    processor := queue.New(
        queue.HandlerFunc[EmailJob](handleEmail),
        q,
        5,              // 5 workers
        10*time.Second, // 10s shutdown timeout
    )
    
    // Start processor in background
    go processor.Run(ctx)
    time.Sleep(time.Millisecond) // Allow processor to start
    
    // Enqueue jobs
    processor.Enqueue(ctx, EmailJob{
        To:      "user@example.com",
        Subject: "Welcome!",
        Body:    "Thanks for signing up",
    })
    
    processor.Enqueue(ctx, EmailJob{
        To:      "admin@example.com",
        Subject: "New User",
        Body:    "A new user signed up",
    })
    
    time.Sleep(time.Second) // Wait for jobs to process
}

Struct Handler

type EmailService struct {
    smtpHost string
    smtpPort int
}

func (s *EmailService) Handle(ctx context.Context, job EmailJob) {
    log.InfoContext(ctx, "sending via SMTP",
        "host", s.smtpHost,
        "to", job.To,
    )
    
    // SMTP sending logic
}

// Usage
emailService := &EmailService{
    smtpHost: "smtp.example.com",
    smtpPort: 587,
}

processor := queue.New(
    emailService,
    queue.NewChanQueue[EmailJob](100, 3*time.Second),
    3,
    5*time.Second,
)

Integration with Application

package main

import (
    "context"
    "time"
    "github.com/platforma-dev/platforma/application"
    "github.com/platforma-dev/platforma/queue"
    "github.com/platforma-dev/platforma/log"
)

type Job struct {
    Data int
}

func jobHandler(ctx context.Context, job Job) {
    log.InfoContext(ctx, "processing job", "data", job.Data)
}

func main() {
    ctx := context.Background()
    app := application.New()
    
    // Create processor
    q := queue.NewChanQueue[Job](50, 2*time.Second)
    processor := queue.New(
        queue.HandlerFunc[Job](jobHandler),
        q,
        3,
        5*time.Second,
    )
    
    // Register as service
    app.RegisterService("job_processor", processor)
    
    // Enqueue jobs during startup
    app.OnStartFunc(func(ctx context.Context) error {
        for i := 1; i <= 10; i++ {
            if err := processor.Enqueue(ctx, Job{Data: i}); err != nil {
                return err
            }
        }
        log.InfoContext(ctx, "enqueued 10 jobs")
        return nil
    }, application.StartupTaskConfig{
        Name:         "enqueue_initial_jobs",
        AbortOnError: false,
    })
    
    if err := app.Run(ctx); err != nil {
        log.ErrorContext(ctx, "app error", "error", err)
    }
}

Graceful Shutdown

The processor handles graceful shutdown automatically:
  1. Context Cancellation: Workers stop accepting new jobs
  2. Drain Period: Workers attempt to finish in-progress jobs within the shutdown timeout
  3. Cleanup: Queue provider is closed
ctx, cancel := context.WithCancel(context.Background())

// Start processor
go processor.Run(ctx)

// Later: trigger shutdown
cancel()

// Workers will:
// 1. Stop accepting new jobs immediately
// 2. Try to complete current jobs within shutdownTimeout
// 3. Exit gracefully

Context Keys

The processor automatically adds a WorkerIDKey to the context for each worker:
func handleJob(ctx context.Context, job Job) {
    workerID := ctx.Value(log.WorkerIDKey).(string)
    log.InfoContext(ctx, "processing", "worker", workerID)
}

Build docs developers (and LLMs) love