Overview
Webhooks allow you to receive HTTP notifications when your Skyvern tasks and workflows complete. Instead of polling for results, Skyvern will automatically POST the run results to your specified endpoint.Key Benefits
- Asynchronous Processing: No need to wait for long-running tasks to complete
- Real-time Notifications: Get immediate updates when runs finish
- Integration: Easily integrate with your existing systems and workflows
- Automatic Retries: Skyvern retries failed webhook deliveries
- Signed Payloads: Verify webhook authenticity with HMAC signatures
Setting Up Webhooks
For Task Runs
from skyvern import Skyvern
skyvern = Skyvern(api_key="your-api-key")
task = await skyvern.run_task(
prompt="Extract product information",
url="https://example.com/products",
webhook_url="https://your-app.com/webhooks/skyvern" # Your endpoint
)
print(f"Task started: {task.run_id}")
print("Webhook will be sent when complete")
For Workflow Runs
workflow_run = await skyvern.run_workflow(
workflow_id="wpid_123",
parameters={"query": "laptops"},
webhook_url="https://your-app.com/webhooks/skyvern" # Your endpoint
)
print(f"Workflow started: {workflow_run.run_id}")
Via API
curl -X POST https://api.skyvern.com/api/v1/tasks \
-H "x-api-key: your-api-key" \
-H "Content-Type: application/json" \
-d '{
"prompt": "Download all invoices from last month",
"url": "https://vendor.example.com/invoices",
"webhook_url": "https://your-app.com/webhooks/skyvern"
}'
Webhook Payload
Skyvern sends the complete run response to your webhook endpoint:Task Webhook Payload
{
"run_id": "tsk_123456789",
"run_type": "task_v2",
"status": "completed",
"output": {
"product_name": "Laptop Pro 15",
"price": 1299.99,
"in_stock": true
},
"downloaded_files": [
{
"file_name": "product_spec.pdf",
"file_size": 1024000,
"s3_uri": "s3://bucket/files/product_spec.pdf",
"presigned_url": "https://s3.amazonaws.com/..."
}
],
"recording_url": "https://recordings.skyvern.com/tsk_123.mp4",
"screenshot_urls": [
"https://screenshots.skyvern.com/tsk_123_final.png"
],
"failure_reason": null,
"created_at": "2024-03-01T10:00:00Z",
"modified_at": "2024-03-01T10:05:23Z",
"queued_at": "2024-03-01T10:00:01Z",
"started_at": "2024-03-01T10:00:15Z",
"finished_at": "2024-03-01T10:05:23Z",
"app_url": "https://app.skyvern.com/tasks/tsk_123456789",
"browser_session_id": "pbs_987654321",
"errors": [],
"step_count": 12,
"run_request": {
"prompt": "Extract product information",
"url": "https://example.com/products",
"webhook_url": "https://your-app.com/webhooks/skyvern",
"data_extraction_schema": {...},
"proxy_location": "RESIDENTIAL"
}
}
Workflow Webhook Payload
{
"run_id": "wr_123456789",
"run_type": "workflow_run",
"status": "completed",
"output": {
"invoices_downloaded": 15,
"total_amount": 45678.90,
"vendors": ["Vendor A", "Vendor B", "Vendor C"]
},
"downloaded_files": [...],
"recording_url": "https://recordings.skyvern.com/wr_123.mp4",
"screenshot_urls": [...],
"failure_reason": null,
"created_at": "2024-03-01T10:00:00Z",
"finished_at": "2024-03-01T10:45:00Z",
"app_url": "https://app.skyvern.com/workflows/wpid_123/wr_123456789",
"browser_session_id": "pbs_987654321",
"errors": [],
"run_request": {
"workflow_id": "wpid_123",
"parameters": {...},
"webhook_url": "https://your-app.com/webhooks/skyvern"
}
}
Webhook Security
Skyvern signs all webhook payloads using HMAC-SHA256:Signature Headers
POST /webhooks/skyvern HTTP/1.1
Host: your-app.com
Content-Type: application/json
X-Skyvern-Signature: sha256=abc123...
X-Skyvern-Timestamp: 1709294400
User-Agent: Skyvern-Webhook/1.0
{...payload...}
Verifying Signatures
import hmac
import hashlib
import time
from fastapi import Request, HTTPException
async def verify_webhook_signature(
request: Request,
api_key: str,
max_age_seconds: int = 300 # 5 minutes
) -> dict:
"""Verify Skyvern webhook signature"""
# Get signature and timestamp from headers
signature = request.headers.get("X-Skyvern-Signature")
timestamp = request.headers.get("X-Skyvern-Timestamp")
if not signature or not timestamp:
raise HTTPException(status_code=401, detail="Missing signature headers")
# Check timestamp to prevent replay attacks
try:
timestamp_int = int(timestamp)
if abs(time.time() - timestamp_int) > max_age_seconds:
raise HTTPException(status_code=401, detail="Timestamp too old")
except ValueError:
raise HTTPException(status_code=401, detail="Invalid timestamp")
# Get request body
body = await request.body()
# Generate expected signature
expected_signature = generate_skyvern_webhook_signature(
payload=body.decode(),
api_key=api_key
)
# Compare signatures (constant-time comparison)
if not hmac.compare_digest(signature, expected_signature):
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse and return payload
import json
return json.loads(body)
def generate_skyvern_webhook_signature(payload: str, api_key: str) -> str:
"""Generate HMAC signature for webhook payload"""
signature = hmac.new(
api_key.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
return f"sha256={signature}"
Example Webhook Endpoint
from fastapi import FastAPI, Request, HTTPException
import structlog
app = FastAPI()
LOG = structlog.get_logger()
SKYVERN_API_KEY = "your-api-key" # Store securely!
@app.post("/webhooks/skyvern")
async def handle_skyvern_webhook(request: Request):
"""Handle webhook from Skyvern"""
# Verify signature
try:
payload = await verify_webhook_signature(request, SKYVERN_API_KEY)
except HTTPException as e:
LOG.error("Webhook signature verification failed", error=str(e))
raise
# Process webhook based on run type
run_id = payload["run_id"]
run_type = payload["run_type"]
status = payload["status"]
LOG.info(
"Received webhook",
run_id=run_id,
run_type=run_type,
status=status
)
if run_type == "task_v2":
await process_task_completion(payload)
elif run_type == "workflow_run":
await process_workflow_completion(payload)
return {"status": "received"}
async def process_task_completion(payload: dict):
"""Process completed task"""
if payload["status"] == "completed":
# Task succeeded
output = payload["output"]
LOG.info("Task completed successfully", output=output)
# Process extracted data
await save_to_database(payload)
# Download files if any
for file in payload.get("downloaded_files", []):
await download_file(file["presigned_url"])
elif payload["status"] == "failed":
# Task failed
reason = payload["failure_reason"]
LOG.error("Task failed", reason=reason)
# Send alert
await send_failure_alert(payload)
async def process_workflow_completion(payload: dict):
"""Process completed workflow"""
if payload["status"] == "completed":
LOG.info("Workflow completed", output=payload["output"])
await handle_workflow_success(payload)
else:
LOG.error("Workflow failed", reason=payload.get("failure_reason"))
await handle_workflow_failure(payload)
Testing Webhooks
Skyvern provides tools to test your webhook endpoint:Test Webhook Endpoint
# Test your webhook with sample payload
response = await skyvern.test_webhook(
webhook_url="https://your-app.com/webhooks/skyvern",
run_type="task" # or "workflow_run"
)
print(f"Status code: {response.status_code}") # Should be 200-299
print(f"Latency: {response.latency_ms}ms")
print(f"Response: {response.response_body}")
Via API
curl -X POST https://api.skyvern.com/api/v1/webhooks/test \
-H "x-api-key: your-api-key" \
-H "Content-Type: application/json" \
-d '{
"webhook_url": "https://your-app.com/webhooks/skyvern",
"run_type": "task",
"run_id": "tsk_test_123"
}'
Local Testing with ngrok
# Expose local server
ngrok http 8000
# Use ngrok URL as webhook
# Example: https://abc123.ngrok.io/webhooks/skyvern
Webhook Retries
Skyvern automatically retries failed webhook deliveries: Retry Policy:- Initial delivery attempt
- Retry after 1 minute
- Retry after 5 minutes
- Retry after 15 minutes
- Final retry after 30 minutes
- HTTP status code 200-299
- Response within 30 seconds
Checking Webhook Status
# Get webhook delivery status
run = await skyvern.get_run("tsk_123456789")
if run.webhook_failure_reason:
print(f"Webhook failed: {run.webhook_failure_reason}")
else:
print("Webhook delivered successfully")
Webhook Replay
Replay webhooks for testing or recovery:# Replay webhook for a specific run
replay_response = await skyvern.replay_webhook(
run_id="tsk_123456789",
override_webhook_url="https://new-endpoint.com/webhooks" # Optional
)
print(f"Replayed to: {replay_response.target_webhook_url}")
print(f"Status: {replay_response.status_code}")
print(f"Response: {replay_response.response_body}")
Via API
curl -X POST https://api.skyvern.com/api/v1/runs/tsk_123/webhook/replay \
-H "x-api-key: your-api-key" \
-H "Content-Type: application/json" \
-d '{
"override_webhook_url": "https://new-endpoint.com/webhooks"
}'
Best Practices
1. Idempotent Handlers
@app.post("/webhooks/skyvern")
async def handle_webhook(request: Request):
payload = await verify_webhook_signature(request, API_KEY)
run_id = payload["run_id"]
# Check if already processed (idempotency)
if await is_already_processed(run_id):
LOG.info("Webhook already processed", run_id=run_id)
return {"status": "duplicate"}
# Process webhook
await process_webhook(payload)
# Mark as processed
await mark_as_processed(run_id)
return {"status": "received"}
2. Fast Response
import asyncio
from fastapi import BackgroundTasks
@app.post("/webhooks/skyvern")
async def handle_webhook(request: Request, background_tasks: BackgroundTasks):
"""Respond quickly, process in background"""
payload = await verify_webhook_signature(request, API_KEY)
# Respond immediately
background_tasks.add_task(process_webhook_async, payload)
return {"status": "received"}
async def process_webhook_async(payload: dict):
"""Process webhook in background"""
await asyncio.sleep(0) # Yield control
# Heavy processing here
await process_data(payload)
3. Error Handling
@app.post("/webhooks/skyvern")
async def handle_webhook(request: Request):
try:
payload = await verify_webhook_signature(request, API_KEY)
await process_webhook(payload)
return {"status": "received"}
except HTTPException:
# Re-raise auth errors
raise
except Exception as e:
# Log error but return 200 to prevent retries
LOG.error("Error processing webhook", error=str(e))
return {"status": "error", "message": "Internal error"}
4. Monitoring
import time
from prometheus_client import Counter, Histogram
webhook_received = Counter(
"skyvern_webhooks_received_total",
"Total webhooks received",
["status", "run_type"]
)
webhook_processing_duration = Histogram(
"skyvern_webhook_processing_seconds",
"Time to process webhook"
)
@app.post("/webhooks/skyvern")
async def handle_webhook(request: Request):
start_time = time.time()
try:
payload = await verify_webhook_signature(request, API_KEY)
await process_webhook(payload)
webhook_received.labels(
status=payload["status"],
run_type=payload["run_type"]
).inc()
return {"status": "received"}
finally:
duration = time.time() - start_time
webhook_processing_duration.observe(duration)
Troubleshooting
Webhook Not Received
- Check URL accessibility
# Test your endpoint
curl -X POST https://your-app.com/webhooks/skyvern \
-H "Content-Type: application/json" \
-d '{"test": "data"}'
- Verify webhook URL in request
task = await skyvern.get_task("tsk_123")
print(f"Webhook URL: {task.webhook_callback_url}")
- Check webhook failure reason
if task.webhook_failure_reason:
print(f"Failure: {task.webhook_failure_reason}")
Signature Verification Failed
# Ensure you're using the correct API key
# The same key used to create the task/workflow
# Debug signature generation
from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature
payload = '{"run_id": "tsk_123"}'
expected = generate_skyvern_webhook_signature(payload, "your-api-key")
received = request.headers.get("X-Skyvern-Signature")
print(f"Expected: {expected}")
print(f"Received: {received}")
print(f"Match: {expected == received}")
Webhook Timeout
# Ensure your endpoint responds within 30 seconds
# Move heavy processing to background tasks
@app.post("/webhooks/skyvern")
async def handle_webhook(request: Request):
payload = await verify_webhook_signature(request, API_KEY)
# Quick validation
run_id = payload["run_id"]
# Return immediately
asyncio.create_task(process_in_background(payload))
return {"status": "received", "run_id": run_id}
Related Documentation
- Tasks - Running tasks
- Workflows - Building workflows
- Authentication - Authentication and security