Skip to main content

Overview

Framefox provides a robust background task system for executing long-running operations asynchronously. Tasks are queued, distributed to workers, and executed outside the request/response cycle.

Task System Features

  • Multiple Brokers: Database and RabbitMQ transport support
  • Priority Queues: Execute important tasks first
  • Scheduled Tasks: Delay or schedule tasks for specific times
  • Retry Logic: Automatic retry on failure
  • Worker Management: Concurrent task processing
  • Event Integration: Task lifecycle events

Quick Start

Defining Tasks

Use the @AsyncTask decorator to define background tasks:
from framefox.core.task.decorator.async_task import AsyncTask

@AsyncTask(queue="default", priority=0, max_retries=3)
async def send_email(to: str, subject: str, body: str):
    """Send an email asynchronously"""
    # Email sending logic
    print(f"Sending email to {to}: {subject}")
    await email_service.send(to, subject, body)
    return {"status": "sent", "recipient": to}

@AsyncTask(queue="high_priority", priority=10)
def process_payment(order_id: str, amount: float):
    """Process payment synchronously"""
    print(f"Processing payment for order {order_id}: ${amount}")
    # Payment processing logic
    return {"status": "completed", "order_id": order_id}

Queuing Tasks

Queue tasks for background execution:
# Queue with .delay() method
task_id = send_email.delay(
    to="[email protected]",
    subject="Welcome!",
    body="Thanks for signing up!"
)

print(f"Task queued: {task_id}")

# Or use TaskManager directly
from framefox.core.task.task_manager import TaskManager
from framefox.core.di.service_container import ServiceContainer

container = ServiceContainer()
task_manager = container.get(TaskManager)

task_id = task_manager.queue_task(
    task=send_email,
    to="[email protected]",
    subject="Welcome!",
    body="Thanks for signing up!"
)

TaskManager

The TaskManager provides full control over task queuing:
from framefox.core.task.task_manager import TaskManager
from framefox.core.di.service_container import ServiceContainer
from datetime import datetime, timedelta

container = ServiceContainer()
task_manager = container.get(TaskManager)

# Queue a task
task_id = task_manager.queue_task(
    task="myapp.tasks.send_email",
    queue="emails",
    priority=5,
    max_retries=3,
    to="[email protected]",
    subject="Hello"
)

# Queue with delay (in seconds)
task_id = task_manager.queue_task(
    task="myapp.tasks.send_reminder",
    delay=3600,  # 1 hour
    user_id=123
)

# Queue with timedelta delay
task_id = task_manager.queue_task(
    task="myapp.tasks.generate_report",
    delay=timedelta(hours=24),
    report_type="monthly"
)

# Schedule for specific time
task_id = task_manager.queue_task(
    task="myapp.tasks.send_newsletter",
    schedule_at=datetime(2024, 1, 1, 9, 0, 0),
    campaign_id="new_year_2024"
)

Task Decorator Options

@AsyncTask(
    queue="default",        # Queue name
    priority=0,             # Higher = more priority
    max_retries=3,         # Retry attempts on failure
    delay=None,            # Delay in seconds or timedelta
    schedule_at=None       # Schedule for specific datetime
)

Queues and Priorities

Multiple Queues

Organize tasks into different queues:
# High priority queue
@AsyncTask(queue="urgent", priority=10)
async def process_critical_alert(alert_id: str):
    # Handle critical alerts immediately
    pass

# Default queue
@AsyncTask(queue="default", priority=5)
async def process_order(order_id: str):
    # Standard order processing
    pass

# Low priority queue
@AsyncTask(queue="maintenance", priority=1)
async def cleanup_old_data(days: int):
    # Periodic cleanup tasks
    pass

Priority Handling

Tasks with higher priority execute first within the same queue:
# Priority 10 - executes first
@AsyncTask(priority=10)
async def urgent_task():
    pass

# Priority 5 - executes second
@AsyncTask(priority=5)
async def normal_task():
    pass

# Priority 1 - executes last
@AsyncTask(priority=1)
async def low_priority_task():
    pass

Worker Manager

The WorkerManager processes tasks from queues:
from framefox.core.task.worker_manager import WorkerManager
from framefox.core.di.service_container import ServiceContainer
from framefox.core.task.broker.broker_interface import BrokerInterface

container = ServiceContainer()
broker = container.get(BrokerInterface)
worker = WorkerManager(broker)

# Configure worker
worker.set_queues(["default", "urgent", "emails"])
worker.set_polling_interval(5)  # Check every 5 seconds
worker.set_concurrent_tasks(10)  # Process 10 tasks concurrently

# Configure cleanup
worker.set_cleanup_config(
    interval_hours=1,  # Clean every hour
    retain_days=7      # Keep failed tasks for 7 days
)

# Start processing
import asyncio
asyncio.run(worker.start())

Running Workers

CLI Command

Run workers using the Framefox CLI:
# Start worker for default queue
framefox worker start

# Start worker for specific queues
framefox worker start --queues default,urgent,emails

# Configure concurrency
framefox worker start --concurrent 20

# Set polling interval
framefox worker start --interval 10

Programmatic Worker

Create a worker script:
# worker.py
import asyncio
from framefox.core.task.worker_manager import WorkerManager
from framefox.core.di.service_container import ServiceContainer
from framefox.core.task.broker.broker_interface import BrokerInterface

async def main():
    container = ServiceContainer()
    broker = container.get(BrokerInterface)
    worker = WorkerManager(broker)
    
    # Configure
    worker.set_queues(["default", "urgent"])
    worker.set_concurrent_tasks(5)
    worker.set_polling_interval(5)
    
    # Start
    await worker.start()

if __name__ == "__main__":
    asyncio.run(main())
Run the worker:
python worker.py

Task Status and Retry

Task Status

Tasks have the following statuses:
from framefox.core.task.entity.task import TaskStatus

# Available statuses:
TaskStatus.PENDING    # Waiting to be processed
TaskStatus.RUNNING    # Currently executing
TaskStatus.COMPLETED  # Successfully finished
TaskStatus.FAILED     # Failed after retries
TaskStatus.RETRYING   # Failed, will retry

Automatic Retry

Tasks automatically retry on failure:
@AsyncTask(max_retries=5)
async def flaky_task():
    """This task will retry up to 5 times on failure"""
    # If this raises an exception, it will be retried
    result = await unreliable_api_call()
    return result

Manual Retry Logic

import asyncio

@AsyncTask(max_retries=3)
async def task_with_custom_retry():
    max_attempts = 3
    
    for attempt in range(max_attempts):
        try:
            result = await external_service()
            return result
        except TemporaryError as e:
            if attempt == max_attempts - 1:
                raise  # Let task system handle final retry
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

Scheduled and Delayed Tasks

Delay Execution

from datetime import timedelta

# Delay by seconds
send_reminder.delay(delay=3600, user_id=123)

# Delay by timedelta
task_manager.queue_task(
    task=send_reminder,
    delay=timedelta(hours=1, minutes=30),
    user_id=123
)

Schedule for Specific Time

from datetime import datetime

# Schedule for specific datetime
scheduled_time = datetime(2024, 12, 25, 9, 0, 0)

task_manager.queue_task(
    task="myapp.tasks.send_holiday_greeting",
    schedule_at=scheduled_time,
    campaign="holiday_2024"
)

Recurring Tasks

Implement recurring tasks manually:
@AsyncTask()
async def daily_report():
    """Generate daily report and reschedule"""
    # Generate report
    await generate_report()
    
    # Reschedule for tomorrow
    from framefox.core.task.task_manager import TaskManager
    from framefox.core.di.service_container import ServiceContainer
    from datetime import datetime, timedelta
    
    container = ServiceContainer()
    task_manager = container.get(TaskManager)
    
    tomorrow = datetime.now() + timedelta(days=1)
    tomorrow = tomorrow.replace(hour=9, minute=0, second=0)
    
    task_manager.queue_task(
        task=daily_report,
        schedule_at=tomorrow
    )

Event Integration

Listen to task lifecycle events:
from framefox.core.events.event_dispatcher import dispatcher

class TaskMonitor:
    def on_task_start(self, payload):
        task = payload.get("task")
        print(f"Task {task.id} started: {task.name}")
    
    def on_task_complete(self, payload):
        task = payload.get("task")
        result = payload.get("result")
        print(f"Task {task.id} completed: {result}")
    
    def on_task_error(self, payload):
        task = payload.get("task")
        error = payload.get("error")
        print(f"Task {task.id} failed: {error}")
    
    def register_listeners(self, dispatcher):
        dispatcher.add_listener(
            "worker.task.before_execution",
            self.on_task_start
        )
        dispatcher.add_listener(
            "worker.task.after_execution",
            self.on_task_complete
        )
        dispatcher.add_listener(
            "worker.task.execution_error",
            self.on_task_error
        )

# Register monitor
monitor = TaskMonitor()
monitor.register_listeners(dispatcher)

Broker Configuration

Database Broker

Default broker using your database:
# config/services.yaml
task:
  broker: database
  defaults:
    queue: default
    priority: 0
    max_retries: 3

RabbitMQ Broker

For high-throughput scenarios:
# config/services.yaml
task:
  broker: rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    vhost: /

Best Practices

  1. Idempotency: Make tasks idempotent (safe to retry)
  2. Task Size: Keep tasks focused and relatively small
  3. Error Handling: Handle expected errors gracefully
  4. Logging: Log task execution for debugging
  5. Monitoring: Track task success/failure rates
  6. Queue Organization: Use multiple queues for different priorities
  7. Resource Management: Clean up resources in tasks
  8. Testing: Test tasks independently from the queue system

Complete Example

# tasks.py
from framefox.core.task.decorator.async_task import AsyncTask
import logging

logger = logging.getLogger(__name__)

@AsyncTask(queue="emails", max_retries=3)
async def send_welcome_email(user_id: int, email: str):
    """Send welcome email to new user"""
    try:
        logger.info(f"Sending welcome email to {email}")
        
        # Send email
        await email_service.send(
            to=email,
            subject="Welcome to Our App!",
            template="welcome",
            context={"user_id": user_id}
        )
        
        logger.info(f"Welcome email sent to {email}")
        return {"status": "sent", "email": email}
        
    except Exception as e:
        logger.error(f"Failed to send email: {e}")
        raise

# routes.py
from fastapi import FastAPI
from .tasks import send_welcome_email

app = FastAPI()

@app.post("/register")
async def register(email: str, username: str):
    # Create user
    user = create_user(email, username)
    
    # Queue welcome email
    task_id = send_welcome_email.delay(
        user_id=user.id,
        email=user.email
    )
    
    return {
        "user_id": user.id,
        "email_task_id": task_id
    }

Build docs developers (and LLMs) love