Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/temporalio/edu-ai-workshop-openai-agents-sdk/llms.txt

Use this file to discover all available pages before exploring further.

Temporal is a durable execution platform: it persists the state of your running code so that workflows survive crashes, retries, and worker restarts without losing progress. This workshop uses the temporalio Python SDK. This page walks through every Temporal primitive you will encounter in Exercises 2, 3, and 4, with real code from the solution notebooks.

The Four Core Primitives

Workflow

Orchestration layer. Declares what should happen and in what order. Must be deterministic.

Activity

Unit of work. Where side-effects like API calls and database writes live. Can fail and be retried automatically.

Worker

A long-running process that polls a task queue and executes workflows and activities.

Client

Connects to the Temporal server. Used to start workflow executions and query their state.

Workflows

A workflow is decorated with @workflow.defn. The method that drives execution is decorated with @workflow.run. Workflows must be deterministic—they cannot perform I/O, generate random numbers, or depend on the current wall-clock time directly. All side-effects belong in activities.
from datetime import timedelta
from temporalio import workflow

@workflow.defn
class HelloWorkflowTemporal:
    """Workflow that orchestrates the weather API activity call."""

    @workflow.run
    async def run(self, state: str) -> str:
        workflow.logger.info(f"Workflow started for state: {state}")

        # Delegate the actual API call to a retryable activity
        result = await workflow.execute_activity(
            get_weather_for_state,
            args=[state],
            start_to_close_timeout=timedelta(seconds=30),
        )

        workflow.logger.info("Workflow finished")
        return f"Workflow result: {result}"

workflow.execute_activity()

This is how a workflow delegates work to an activity. The key parameter is start_to_close_timeout, which sets the maximum time allowed for a single activity attempt:
result = await workflow.execute_activity(
    get_weather_for_state,   # The @activity.defn function to call
    args=["CA"],             # Positional arguments passed to the activity
    start_to_close_timeout=timedelta(seconds=30),
)
Every workflow.execute_activity() call must include a start_to_close_timeout. Temporal uses this to detect stuck activities and trigger retries. A common workshop pitfall is setting this too low for GPT-4 calls — 30 seconds is a safe minimum.

workflow.sleep() — Durable Timers

workflow.sleep() pauses the workflow for a specified duration. Unlike asyncio.sleep(), this timer is durable: if the worker process crashes while waiting, the timer continues tracking on the Temporal server and the workflow resumes correctly when a worker comes back online.
from datetime import timedelta
from temporalio import workflow

# This 10-second pause survives worker restarts — used in Exercise 4
# to demonstrate that Temporal resumes workflows from where they left off
await workflow.sleep(timedelta(seconds=10))

Activities

An activity is any Python function decorated with @activity.defn. Activities are where your code interacts with the outside world—HTTP requests, database writes, file I/O. They can fail, and Temporal will automatically retry them according to the configured retry policy.
from temporalio import activity
import httpx

@activity.defn
async def get_weather_for_state(state: str) -> str:
    """Activity that fetches real weather alerts from National Weather Service API."""
    activity.logger.info(f"Fetching weather alerts for {state}")

    try:
        url = f"https://api.weather.gov/alerts/active/area/{state.upper()}"
        headers = {"User-Agent": "Temporal-Workshop (educational)"}

        async with httpx.AsyncClient() as client:
            response = await client.get(url, headers=headers, timeout=10.0)
            response.raise_for_status()

            data = response.json()
            features = data.get("features", [])

            if not features:
                return f"No active weather alerts for {state.upper()}."

            alerts = []
            for feature in features[:3]:
                properties = feature.get("properties", {})
                event = properties.get("event", "Unknown")
                severity = properties.get("severity", "Unknown")
                alerts.append(f"- {event} ({severity})")

            return f"Active weather alerts for {state.upper()}:\n" + "\n".join(alerts)

    except httpx.HTTPError as e:
        error_msg = f"Failed to fetch weather alerts: {str(e)}"
        activity.logger.error(f"[ERROR] {error_msg}")
        return error_msg
Use activity.logger inside activities rather than print(). The Temporal Worker captures these logs and associates them with the correct workflow execution in the Temporal UI.

Named Activities

You can give an activity an explicit name that differs from its function name. Exercise 3 uses this pattern:
@activity.defn(name="get_weather")
async def get_weather(state: str) -> dict:
    """Fetch active NWS alerts for a 2-letter US state code (e.g., 'CA')."""
    ...
The name parameter controls what appears in the Temporal UI execution history.

Workers

A Worker is a process that:
  1. Connects to the Temporal server
  2. Declares which workflows and activities it can execute
  3. Polls a named task queue for new work
  4. Executes tasks and reports results back to the server
from temporalio.client import Client
from temporalio.worker import Worker, UnsandboxedWorkflowRunner

async def run_worker():
    """Start a Temporal worker that listens for workflow and activity tasks."""
    client = await Client.connect("localhost:7233")

    task_queue = "hello-temporal-task-queue"
    worker = Worker(
        client,
        task_queue=task_queue,
        workflows=[HelloWorkflowTemporal],
        activities=[get_weather_for_state],
        workflow_runner=UnsandboxedWorkflowRunner(),
    )

    print(f"[OK] Worker started on task queue: {task_queue}")
    print("   Listening for workflow and activity tasks...")
    await worker.run()

Task Queues

The task queue is just a string name. The worker and the workflow starter must use the same string—this is how Temporal routes work to the right worker. Exercises 2 and 3 use separate queue names to avoid collisions:
ExerciseTask Queue Name
Exercise 2"hello-temporal-task-queue"
Exercise 3"agents-sdk-queue"
Exercise 4"routing-workflow-queue"

UnsandboxedWorkflowRunner

Temporal normally runs workflows inside a sandbox that restricts imports to guarantee determinism. Inside a Jupyter notebook the sandbox causes import errors, so the workshop uses UnsandboxedWorkflowRunner to bypass it:
from temporalio.worker import UnsandboxedWorkflowRunner, Worker

worker = Worker(
    client,
    task_queue=task_queue,
    workflows=[HelloWorkflowTemporal],
    activities=[get_weather_for_state],
    workflow_runner=UnsandboxedWorkflowRunner(),  # Required in Jupyter notebooks
)
UnsandboxedWorkflowRunner is appropriate for development and Jupyter-based workshops. In production deployments, use the default sandboxed runner and structure your imports carefully.

The Temporal Client

Client.connect() returns a client object used to start workflows, query their state, and send signals. In Exercise 2, the starter code uses it like this:
from temporalio.client import Client

async def run_solution():
    client = await Client.connect("localhost:7233")

    handle = await client.start_workflow(
        HelloWorkflowTemporal.run,           # Workflow method to execute
        "CA",                                # Argument passed to workflow.run()
        id="weather-mon-jun-09-120000est",   # Unique ID for this execution
        task_queue="hello-temporal-task-queue",
    )

    print(f"[OK] Workflow started: {handle.id}")
    print(f"View in Temporal UI: http://localhost:8233/namespaces/default/workflows/{handle.id}")

    result = await handle.result()
    print(f"Workflow completed: {result}")
client.start_workflow() returns a handle immediately. Call await handle.result() to block until the workflow finishes and get the return value.

Architecture: How the Pieces Connect

  ┌─────────────────┐     start_workflow()     ┌──────────────────────┐
  │  Starter Script  │ ───────────────────────► │  Temporal Server     │
  │  (Client)        │                          │  (State Store)       │
  └─────────────────┘                          └──────────────────────┘

                                              task queue polling


                                               ┌──────────────────┐
                                               │     Worker       │
                                               │                  │
                                               │  ┌────────────┐  │
                                               │  │  Workflow  │  │
                                               │  │  (defn)    │  │
                                               │  └─────┬──────┘  │
                                               │        │          │
                                               │  ┌─────▼──────┐  │
                                               │  │  Activity  │  │
                                               │  │  (defn)    │  │
                                               │  └────────────┘  │
                                               └──────────────────┘
The Temporal server never executes your code directly — it only stores state and schedules tasks. Your Worker process does all the actual execution.

The Temporal Web UI

The Temporal dev server exposes a Web UI on port 8233. In GitHub Codespaces, open the Ports tab in VS Code, find port 8233, and click the Globe icon to open it in your browser. The UI shows:
  • Workflow execution list — every started workflow with its status (Running / Completed / Failed)
  • Execution history — a detailed event log for each workflow: when activities were scheduled, started, completed, or retried
  • Activity retry events — each failed and retried attempt with the error message and backoff timing
  • Input/output payloads — the arguments and return values for every workflow and activity
In Exercise 3, trigger the simulated make_api_call_bug failure and watch the activity retry events appear in the UI in real time. Then “fix” the bug and observe the workflow resume from the failed activity without re-running any already-completed steps.

Running the Worker in Jupyter

Exercises 2 and 3 run the worker as a background async task within the notebook so you can start workflows from subsequent cells:
import asyncio
import nest_asyncio

# Allow nested event loops (Jupyter already has one running)
nest_asyncio.apply()

# Run the worker as a background task — does not block the notebook
worker_task = asyncio.create_task(run_worker())
print("Worker running in background")
Exercise 4 uses a separate worker.py process instead—a production-style pattern where the worker and starter are independent OS processes communicating only through the Temporal server.

Exercise 2: Temporal Hello World

Write your first workflow and activity, observe retries in the Temporal UI, and start workflows from a client.

Durable AI Agents

See how wrapping LLM calls in Temporal activities gives your agents automatic crash recovery and an audit trail.

Build docs developers (and LLMs) love