Documentation Index
Fetch the complete documentation index at: https://mintlify.com/python/cpython/llms.txt
Use this file to discover all available pages before exploring further.
Asynchronous programming with async/await allows Python code to handle concurrent operations efficiently, particularly for I/O-bound tasks like network requests and file operations.
Core Concepts
Event Loop
The event loop is the central coordinator that manages and schedules async tasks:
import asyncio
# Create and run an event loop
event_loop = asyncio.new_event_loop()
event_loop.run_forever()
In practice, use asyncio.run() which handles the event loop for you:
import asyncio
async def main():
print("Hello async world!")
asyncio.run(main())
Coroutines
Coroutines are functions defined with async def:
# Regular function
def regular_function():
return "I run immediately"
# Coroutine function
async def coroutine_function():
return "I need to be awaited"
# Calling creates a coroutine object, doesn't run it
coro = coroutine_function()
type(coro) # <class 'coroutine'>
# Must await or schedule to actually run
result = asyncio.run(coroutine_function())
Getting Started
import asyncio
async def fetch_data():
print("Start fetching")
await asyncio.sleep(2) # Simulate I/O operation
print("Done fetching")
return {"data": 42}
# Method 1: Using asyncio.run() (recommended)
result = asyncio.run(fetch_data())
# Method 2: In an existing async context
async def main():
result = await fetch_data()
print(result)
asyncio.run(main())
import asyncio
async def task_one():
await asyncio.sleep(1)
return "Task one complete"
async def task_two():
await asyncio.sleep(2)
return "Task two complete"
async def main():
# Run concurrently
results = await asyncio.gather(
task_one(),
task_two()
)
print(results)
# ['Task one complete', 'Task two complete']
asyncio.run(main())
The await Keyword
Awaiting Tasks
await pauses the current coroutine and lets the event loop run other tasks:
async def make_coffee():
print("Grinding beans...")
await asyncio.sleep(1) # Yields control to event loop
print("Brewing coffee...")
await asyncio.sleep(2) # Yields control again
print("Coffee ready!")
return "☕"
async def make_toast():
print("Toasting bread...")
await asyncio.sleep(1.5)
print("Toast ready!")
return "🍞"
async def make_breakfast():
# Run both concurrently
coffee, toast = await asyncio.gather(
make_coffee(),
make_toast()
)
print(f"Breakfast ready: {coffee} {toast}")
asyncio.run(make_breakfast())
Output:
Grinding beans...
Toasting bread...
Brewing coffee...
Toast ready!
Coffee ready!
Breakfast ready: ☕ 🍞
Awaiting Coroutines vs Tasks
Important distinction:import asyncio
async def work():
await asyncio.sleep(1)
print("Work done")
async def main():
# Awaiting coroutine - runs sequentially, doesn't yield control
await work()
await work()
# Takes 2 seconds total
# Creating tasks - runs concurrently
task1 = asyncio.create_task(work())
task2 = asyncio.create_task(work())
await task1
await task2
# Takes 1 second total
Tasks
Creating Tasks
Tasks wrap coroutines and schedule them for execution:
import asyncio
async def fetch_user(user_id):
await asyncio.sleep(1)
return f"User {user_id} data"
async def main():
# Create tasks
task1 = asyncio.create_task(fetch_user(1))
task2 = asyncio.create_task(fetch_user(2))
task3 = asyncio.create_task(fetch_user(3))
# Wait for all tasks
results = await asyncio.gather(task1, task2, task3)
print(results)
# ['User 1 data', 'User 2 data', 'User 3 data']
asyncio.run(main())
Task Management
import asyncio
async def long_task():
try:
await asyncio.sleep(10)
return "Task completed"
except asyncio.CancelledError:
print("Task was cancelled")
raise
async def main():
task = asyncio.create_task(long_task())
# Wait a bit
await asyncio.sleep(1)
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Caught cancellation")
asyncio.run(main())
Waiting Strategies
asyncio.gather()
Run multiple coroutines concurrently, wait for all:
import asyncio
async def fetch(url):
await asyncio.sleep(1)
return f"Content from {url}"
async def main():
results = await asyncio.gather(
fetch("url1"),
fetch("url2"),
fetch("url3")
)
print(results)
# ['Content from url1', 'Content from url2', 'Content from url3']
asyncio.run(main())
asyncio.wait()
More control over completion:
import asyncio
async def task(n):
await asyncio.sleep(n)
return f"Task {n}"
async def main():
tasks = [
asyncio.create_task(task(1)),
asyncio.create_task(task(2)),
asyncio.create_task(task(3))
]
# Wait for first task to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"Done: {len(done)}, Pending: {len(pending)}")
# Done: 1, Pending: 2
# Cancel pending tasks
for task in pending:
task.cancel()
asyncio.run(main())
asyncio.wait_for()
Set a timeout:
import asyncio
async def slow_operation():
await asyncio.sleep(5)
return "Done"
async def main():
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=2.0
)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(main())
Async Context Managers
Use async with for resources that need async setup/cleanup:
import asyncio
class AsyncDatabase:
async def __aenter__(self):
print("Connecting to database...")
await asyncio.sleep(1)
print("Connected!")
return self
async def __aexit__(self, exc_type, exc, tb):
print("Closing database connection...")
await asyncio.sleep(0.5)
print("Closed!")
async def query(self, sql):
await asyncio.sleep(0.5)
return f"Results for: {sql}"
async def main():
async with AsyncDatabase() as db:
result = await db.query("SELECT * FROM users")
print(result)
asyncio.run(main())
Async Iterators
Use async for to iterate over async data sources:
import asyncio
class AsyncRange:
def __init__(self, start, end):
self.start = start
self.end = end
self.current = start
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.5) # Simulate async work
value = self.current
self.current += 1
return value
async def main():
async for num in AsyncRange(1, 5):
print(num)
asyncio.run(main())
Error Handling
Try-Except with Async
import asyncio
async def risky_operation():
await asyncio.sleep(1)
raise ValueError("Something went wrong!")
async def main():
try:
await risky_operation()
except ValueError as e:
print(f"Caught error: {e}")
asyncio.run(main())
Gathering with Exceptions
import asyncio
async def task_success():
await asyncio.sleep(1)
return "Success"
async def task_failure():
await asyncio.sleep(1)
raise ValueError("Failed")
async def main():
# return_exceptions=True prevents gather from raising
results = await asyncio.gather(
task_success(),
task_failure(),
task_success(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())
Real-World Examples
Concurrent HTTP Requests
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://api.example.com/users/1',
'https://api.example.com/users/2',
'https://api.example.com/users/3',
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
responses = await asyncio.gather(*tasks)
for url, response in zip(urls, responses):
print(f"{url}: {len(response)} bytes")
asyncio.run(main())
Async Database Operations
import asyncio
import asyncpg
async def fetch_users():
conn = await asyncpg.connect(
user='user',
password='password',
database='mydb',
host='localhost'
)
try:
rows = await conn.fetch('SELECT * FROM users')
return [dict(row) for row in rows]
finally:
await conn.close()
async def main():
users = await fetch_users()
for user in users:
print(user)
asyncio.run(main())
Producer-Consumer Pattern
import asyncio
import random
async def producer(queue, n):
for i in range(n):
item = f"item-{i}"
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(random.random())
# Signal completion
await queue.put(None)
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed: {item}")
await asyncio.sleep(random.random())
queue.task_done()
async def main():
queue = asyncio.Queue()
# Run producer and consumer concurrently
await asyncio.gather(
producer(queue, 5),
consumer(queue)
)
asyncio.run(main())
Custom Async Sleep
Understanding how async operations work internally:
import asyncio
import time
class YieldToEventLoop:
"""Simple awaitable that yields control."""
def __await__(self):
yield
async def custom_sleep(seconds):
"""Custom implementation of asyncio.sleep."""
future = asyncio.Future()
time_to_wake = time.time() + seconds
async def watcher():
while True:
if time.time() >= time_to_wake:
future.set_result(None)
break
else:
await YieldToEventLoop()
asyncio.create_task(watcher())
await future
async def main():
print(f"Starting at {time.strftime('%H:%M:%S')}")
await custom_sleep(2)
print(f"Finished at {time.strftime('%H:%M:%S')}")
asyncio.run(main())
Best Practices
When to use async/await:
- ✅ I/O-bound operations (network requests, file I/O, database queries)
- ✅ Handling many concurrent connections
- ✅ Web servers and APIs
- ✅ Websockets and real-time applications
- ❌ CPU-bound tasks (use multiprocessing instead)
- ❌ Simple scripts (adds unnecessary complexity)
- ❌ Blocking libraries (use asyncio-compatible alternatives)
Common pitfalls:
-
Blocking the event loop:
# Bad - blocks event loop
async def bad():
time.sleep(1) # Blocks!
# Good - yields control
async def good():
await asyncio.sleep(1)
-
Forgetting to await:
# Bad - creates coroutine but doesn't run it
async def bad():
fetch_data() # Warning: coroutine never awaited
# Good
async def good():
await fetch_data()
-
Not creating tasks for concurrency:
# Bad - runs sequentially
async def bad():
await task1()
await task2()
# Good - runs concurrently
async def good():
await asyncio.gather(task1(), task2())
Debugging
Enable Debug Mode
import asyncio
async def main():
# Your async code
pass
# Enable debug mode
asyncio.run(main(), debug=True)
Debug mode will:
- Log slow coroutines (>100ms)
- Warn about unawaited coroutines
- Track task creation locations
Check Running Tasks
import asyncio
async def monitor_tasks():
while True:
tasks = asyncio.all_tasks()
print(f"Active tasks: {len(tasks)}")
for task in tasks:
print(f" - {task.get_name()}")
await asyncio.sleep(5)
Summary
Key takeaways:
- Use
async def to create coroutines
- Use
await to call async functions and yield control
- Create tasks with
asyncio.create_task() for concurrency
- Use
asyncio.gather() to wait for multiple tasks
- Always await coroutines or create tasks from them
- Never use blocking operations in async code
- Use
asyncio.run() to start your async program