Skip to main content

Overview

Webhooks allow you to receive HTTP notifications when your Skyvern tasks and workflows complete. Instead of polling for results, Skyvern will automatically POST the run results to your specified endpoint.

Key Benefits

  • Asynchronous Processing: No need to wait for long-running tasks to complete
  • Real-time Notifications: Get immediate updates when runs finish
  • Integration: Easily integrate with your existing systems and workflows
  • Automatic Retries: Skyvern retries failed webhook deliveries
  • Signed Payloads: Verify webhook authenticity with HMAC signatures

Setting Up Webhooks

For Task Runs

from skyvern import Skyvern

skyvern = Skyvern(api_key="your-api-key")

task = await skyvern.run_task(
    prompt="Extract product information",
    url="https://example.com/products",
    webhook_url="https://your-app.com/webhooks/skyvern"  # Your endpoint
)

print(f"Task started: {task.run_id}")
print("Webhook will be sent when complete")

For Workflow Runs

workflow_run = await skyvern.run_workflow(
    workflow_id="wpid_123",
    parameters={"query": "laptops"},
    webhook_url="https://your-app.com/webhooks/skyvern"  # Your endpoint
)

print(f"Workflow started: {workflow_run.run_id}")

Via API

curl -X POST https://api.skyvern.com/api/v1/tasks \
  -H "x-api-key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "prompt": "Download all invoices from last month",
    "url": "https://vendor.example.com/invoices",
    "webhook_url": "https://your-app.com/webhooks/skyvern"
  }'

Webhook Payload

Skyvern sends the complete run response to your webhook endpoint:

Task Webhook Payload

{
  "run_id": "tsk_123456789",
  "run_type": "task_v2",
  "status": "completed",
  "output": {
    "product_name": "Laptop Pro 15",
    "price": 1299.99,
    "in_stock": true
  },
  "downloaded_files": [
    {
      "file_name": "product_spec.pdf",
      "file_size": 1024000,
      "s3_uri": "s3://bucket/files/product_spec.pdf",
      "presigned_url": "https://s3.amazonaws.com/..."
    }
  ],
  "recording_url": "https://recordings.skyvern.com/tsk_123.mp4",
  "screenshot_urls": [
    "https://screenshots.skyvern.com/tsk_123_final.png"
  ],
  "failure_reason": null,
  "created_at": "2024-03-01T10:00:00Z",
  "modified_at": "2024-03-01T10:05:23Z",
  "queued_at": "2024-03-01T10:00:01Z",
  "started_at": "2024-03-01T10:00:15Z",
  "finished_at": "2024-03-01T10:05:23Z",
  "app_url": "https://app.skyvern.com/tasks/tsk_123456789",
  "browser_session_id": "pbs_987654321",
  "errors": [],
  "step_count": 12,
  "run_request": {
    "prompt": "Extract product information",
    "url": "https://example.com/products",
    "webhook_url": "https://your-app.com/webhooks/skyvern",
    "data_extraction_schema": {...},
    "proxy_location": "RESIDENTIAL"
  }
}

Workflow Webhook Payload

{
  "run_id": "wr_123456789",
  "run_type": "workflow_run",
  "status": "completed",
  "output": {
    "invoices_downloaded": 15,
    "total_amount": 45678.90,
    "vendors": ["Vendor A", "Vendor B", "Vendor C"]
  },
  "downloaded_files": [...],
  "recording_url": "https://recordings.skyvern.com/wr_123.mp4",
  "screenshot_urls": [...],
  "failure_reason": null,
  "created_at": "2024-03-01T10:00:00Z",
  "finished_at": "2024-03-01T10:45:00Z",
  "app_url": "https://app.skyvern.com/workflows/wpid_123/wr_123456789",
  "browser_session_id": "pbs_987654321",
  "errors": [],
  "run_request": {
    "workflow_id": "wpid_123",
    "parameters": {...},
    "webhook_url": "https://your-app.com/webhooks/skyvern"
  }
}

Webhook Security

Skyvern signs all webhook payloads using HMAC-SHA256:

Signature Headers

POST /webhooks/skyvern HTTP/1.1
Host: your-app.com
Content-Type: application/json
X-Skyvern-Signature: sha256=abc123...
X-Skyvern-Timestamp: 1709294400
User-Agent: Skyvern-Webhook/1.0

{...payload...}

Verifying Signatures

import hmac
import hashlib
import time
from fastapi import Request, HTTPException

async def verify_webhook_signature(
    request: Request,
    api_key: str,
    max_age_seconds: int = 300  # 5 minutes
) -> dict:
    """Verify Skyvern webhook signature"""
    
    # Get signature and timestamp from headers
    signature = request.headers.get("X-Skyvern-Signature")
    timestamp = request.headers.get("X-Skyvern-Timestamp")
    
    if not signature or not timestamp:
        raise HTTPException(status_code=401, detail="Missing signature headers")
    
    # Check timestamp to prevent replay attacks
    try:
        timestamp_int = int(timestamp)
        if abs(time.time() - timestamp_int) > max_age_seconds:
            raise HTTPException(status_code=401, detail="Timestamp too old")
    except ValueError:
        raise HTTPException(status_code=401, detail="Invalid timestamp")
    
    # Get request body
    body = await request.body()
    
    # Generate expected signature
    expected_signature = generate_skyvern_webhook_signature(
        payload=body.decode(),
        api_key=api_key
    )
    
    # Compare signatures (constant-time comparison)
    if not hmac.compare_digest(signature, expected_signature):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    # Parse and return payload
    import json
    return json.loads(body)


def generate_skyvern_webhook_signature(payload: str, api_key: str) -> str:
    """Generate HMAC signature for webhook payload"""
    signature = hmac.new(
        api_key.encode(),
        payload.encode(),
        hashlib.sha256
    ).hexdigest()
    return f"sha256={signature}"

Example Webhook Endpoint

from fastapi import FastAPI, Request, HTTPException
import structlog

app = FastAPI()
LOG = structlog.get_logger()

SKYVERN_API_KEY = "your-api-key"  # Store securely!

@app.post("/webhooks/skyvern")
async def handle_skyvern_webhook(request: Request):
    """Handle webhook from Skyvern"""
    
    # Verify signature
    try:
        payload = await verify_webhook_signature(request, SKYVERN_API_KEY)
    except HTTPException as e:
        LOG.error("Webhook signature verification failed", error=str(e))
        raise
    
    # Process webhook based on run type
    run_id = payload["run_id"]
    run_type = payload["run_type"]
    status = payload["status"]
    
    LOG.info(
        "Received webhook",
        run_id=run_id,
        run_type=run_type,
        status=status
    )
    
    if run_type == "task_v2":
        await process_task_completion(payload)
    elif run_type == "workflow_run":
        await process_workflow_completion(payload)
    
    return {"status": "received"}


async def process_task_completion(payload: dict):
    """Process completed task"""
    if payload["status"] == "completed":
        # Task succeeded
        output = payload["output"]
        LOG.info("Task completed successfully", output=output)
        
        # Process extracted data
        await save_to_database(payload)
        
        # Download files if any
        for file in payload.get("downloaded_files", []):
            await download_file(file["presigned_url"])
            
    elif payload["status"] == "failed":
        # Task failed
        reason = payload["failure_reason"]
        LOG.error("Task failed", reason=reason)
        
        # Send alert
        await send_failure_alert(payload)


async def process_workflow_completion(payload: dict):
    """Process completed workflow"""
    if payload["status"] == "completed":
        LOG.info("Workflow completed", output=payload["output"])
        await handle_workflow_success(payload)
    else:
        LOG.error("Workflow failed", reason=payload.get("failure_reason"))
        await handle_workflow_failure(payload)

Testing Webhooks

Skyvern provides tools to test your webhook endpoint:

Test Webhook Endpoint

# Test your webhook with sample payload
response = await skyvern.test_webhook(
    webhook_url="https://your-app.com/webhooks/skyvern",
    run_type="task"  # or "workflow_run"
)

print(f"Status code: {response.status_code}")  # Should be 200-299
print(f"Latency: {response.latency_ms}ms")
print(f"Response: {response.response_body}")

Via API

curl -X POST https://api.skyvern.com/api/v1/webhooks/test \
  -H "x-api-key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "webhook_url": "https://your-app.com/webhooks/skyvern",
    "run_type": "task",
    "run_id": "tsk_test_123"
  }'

Local Testing with ngrok

# Expose local server
ngrok http 8000

# Use ngrok URL as webhook
# Example: https://abc123.ngrok.io/webhooks/skyvern

Webhook Retries

Skyvern automatically retries failed webhook deliveries: Retry Policy:
  • Initial delivery attempt
  • Retry after 1 minute
  • Retry after 5 minutes
  • Retry after 15 minutes
  • Final retry after 30 minutes
Success Criteria:
  • HTTP status code 200-299
  • Response within 30 seconds

Checking Webhook Status

# Get webhook delivery status
run = await skyvern.get_run("tsk_123456789")

if run.webhook_failure_reason:
    print(f"Webhook failed: {run.webhook_failure_reason}")
else:
    print("Webhook delivered successfully")

Webhook Replay

Replay webhooks for testing or recovery:
# Replay webhook for a specific run
replay_response = await skyvern.replay_webhook(
    run_id="tsk_123456789",
    override_webhook_url="https://new-endpoint.com/webhooks"  # Optional
)

print(f"Replayed to: {replay_response.target_webhook_url}")
print(f"Status: {replay_response.status_code}")
print(f"Response: {replay_response.response_body}")

Via API

curl -X POST https://api.skyvern.com/api/v1/runs/tsk_123/webhook/replay \
  -H "x-api-key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "override_webhook_url": "https://new-endpoint.com/webhooks"
  }'

Best Practices

1. Idempotent Handlers

@app.post("/webhooks/skyvern")
async def handle_webhook(request: Request):
    payload = await verify_webhook_signature(request, API_KEY)
    
    run_id = payload["run_id"]
    
    # Check if already processed (idempotency)
    if await is_already_processed(run_id):
        LOG.info("Webhook already processed", run_id=run_id)
        return {"status": "duplicate"}
    
    # Process webhook
    await process_webhook(payload)
    
    # Mark as processed
    await mark_as_processed(run_id)
    
    return {"status": "received"}

2. Fast Response

import asyncio
from fastapi import BackgroundTasks

@app.post("/webhooks/skyvern")
async def handle_webhook(request: Request, background_tasks: BackgroundTasks):
    """Respond quickly, process in background"""
    payload = await verify_webhook_signature(request, API_KEY)
    
    # Respond immediately
    background_tasks.add_task(process_webhook_async, payload)
    
    return {"status": "received"}


async def process_webhook_async(payload: dict):
    """Process webhook in background"""
    await asyncio.sleep(0)  # Yield control
    # Heavy processing here
    await process_data(payload)

3. Error Handling

@app.post("/webhooks/skyvern")
async def handle_webhook(request: Request):
    try:
        payload = await verify_webhook_signature(request, API_KEY)
        await process_webhook(payload)
        return {"status": "received"}
        
    except HTTPException:
        # Re-raise auth errors
        raise
        
    except Exception as e:
        # Log error but return 200 to prevent retries
        LOG.error("Error processing webhook", error=str(e))
        return {"status": "error", "message": "Internal error"}

4. Monitoring

import time
from prometheus_client import Counter, Histogram

webhook_received = Counter(
    "skyvern_webhooks_received_total",
    "Total webhooks received",
    ["status", "run_type"]
)

webhook_processing_duration = Histogram(
    "skyvern_webhook_processing_seconds",
    "Time to process webhook"
)

@app.post("/webhooks/skyvern")
async def handle_webhook(request: Request):
    start_time = time.time()
    
    try:
        payload = await verify_webhook_signature(request, API_KEY)
        
        await process_webhook(payload)
        
        webhook_received.labels(
            status=payload["status"],
            run_type=payload["run_type"]
        ).inc()
        
        return {"status": "received"}
        
    finally:
        duration = time.time() - start_time
        webhook_processing_duration.observe(duration)

Troubleshooting

Webhook Not Received

  1. Check URL accessibility
# Test your endpoint
curl -X POST https://your-app.com/webhooks/skyvern \
  -H "Content-Type: application/json" \
  -d '{"test": "data"}'
  1. Verify webhook URL in request
task = await skyvern.get_task("tsk_123")
print(f"Webhook URL: {task.webhook_callback_url}")
  1. Check webhook failure reason
if task.webhook_failure_reason:
    print(f"Failure: {task.webhook_failure_reason}")

Signature Verification Failed

# Ensure you're using the correct API key
# The same key used to create the task/workflow

# Debug signature generation
from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature

payload = '{"run_id": "tsk_123"}'
expected = generate_skyvern_webhook_signature(payload, "your-api-key")
received = request.headers.get("X-Skyvern-Signature")

print(f"Expected: {expected}")
print(f"Received: {received}")
print(f"Match: {expected == received}")

Webhook Timeout

# Ensure your endpoint responds within 30 seconds
# Move heavy processing to background tasks

@app.post("/webhooks/skyvern")
async def handle_webhook(request: Request):
    payload = await verify_webhook_signature(request, API_KEY)
    
    # Quick validation
    run_id = payload["run_id"]
    
    # Return immediately
    asyncio.create_task(process_in_background(payload))
    
    return {"status": "received", "run_id": run_id}

Build docs developers (and LLMs) love