Documentation Index
Fetch the complete documentation index at: https://mintlify.com/TracingInsights/tif1/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The tif1 library provides async methods for loading data in parallel, offering 4-5x speedup compared to synchronous loading. This is especially beneficial when loading data with a cold cache.
Why Use Async Loading?
Async loading provides significant performance benefits:
- 4-5x faster data loading with cold cache
- Parallel fetching of multiple data sources
- Non-blocking operations for better resource utilization
- Efficient HTTP/2 multiplexing for concurrent requests
The speedup comes from fetching multiple data files (lap data, telemetry, weather, etc.) in parallel rather than sequentially.
Basic Async Loading
Load session data asynchronously:
import asyncio
import tif1
async def load_session_async():
"""Load session data asynchronously."""
session = tif1.get_session(2025, "Abu Dhabi Grand Prix", "Practice 1")
# Load laps asynchronously
laps = await session.laps_async()
print(f"Loaded {len(laps)} laps")
return laps
# Run the async function
laps = asyncio.run(load_session_async())
Compare sync vs async loading:
import asyncio
import time
import tif1
async def load_data_async():
"""Load data asynchronously (4-5x faster)."""
session = tif1.get_session(2025, "Abu Dhabi Grand Prix", "Practice 1")
start = time.time()
laps = await session.laps_async()
elapsed = time.time() - start
print(f"✓ Async: Loaded {len(laps)} laps in {elapsed:.2f}s")
return laps, elapsed
def load_data_sync():
"""Load data synchronously."""
session = tif1.get_session(2025, "Abu Dhabi Grand Prix", "Practice 1")
start = time.time()
laps = session.laps
elapsed = time.time() - start
print(f"✓ Sync: Loaded {len(laps)} laps in {elapsed:.2f}s")
return laps, elapsed
# Clear cache for fair comparison
cache = tif1.get_cache()
cache.clear()
print("ASYNC vs SYNC LOADING COMPARISON")
print("=" * 60)
# Test async loading
print("\nTesting async loading...")
laps_async, time_async = asyncio.run(load_data_async())
# Clear cache again
cache.clear()
# Test sync loading
print("\nTesting sync loading...")
laps_sync, time_sync = load_data_sync()
# Show comparison
print("\n" + "=" * 60)
print("RESULTS")
print("=" * 60)
speedup = time_sync / time_async if time_async > 0 else 0
print(f"Async time: {time_async:.2f}s")
print(f"Sync time: {time_sync:.2f}s")
print(f"Speedup: {speedup:.1f}x faster with async")
Typical output:
Async time: 0.82s
Sync time: 3.45s
Speedup: 4.2x faster with async
Loading Multiple Sessions
Load multiple sessions in parallel for maximum performance:
import asyncio
import time
import tif1
async def load_multiple_sessions():
"""Load multiple sessions in parallel."""
sessions_to_load = [
(2025, "Monaco Grand Prix", "Qualifying"),
(2025, "Monaco Grand Prix", "Race"),
(2025, "British Grand Prix", "Qualifying"),
]
async def load_one_session(year, event, session_type):
session = tif1.get_session(year, event, session_type)
laps = await session.laps_async()
return (event, session_type, len(laps))
# Load all sessions in parallel
start = time.time()
tasks = [
load_one_session(year, event, session_type)
for year, event, session_type in sessions_to_load
]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"\nLoaded {len(results)} sessions in {elapsed:.2f}s:")
for event, session_type, lap_count in results:
print(f" {event} {session_type}: {lap_count} laps")
return results, elapsed
# Run
results, elapsed = asyncio.run(load_multiple_sessions())
Async Telemetry Loading
The fastest lap telemetry methods use async loading internally:
import tif1
import time
session = tif1.get_session(2025, "Monaco Grand Prix", "Race")
print("ASYNC TELEMETRY LOADING")
print("=" * 60)
# Single fastest lap telemetry (uses async internally)
start = time.time()
fastest_tel = session.get_fastest_lap_tel()
elapsed = time.time() - start
print(f"\n1. Single fastest lap: {elapsed:.2f}s")
print(f" Points: {len(fastest_tel)}")
# Multiple drivers' fastest lap telemetry (parallel!)
start = time.time()
all_fastest_tels = session.get_fastest_laps_tels(by_driver=True)
elapsed = time.time() - start
num_drivers = len(all_fastest_tels["Driver"].unique())
print(f"\n2. All drivers' fastest laps: {elapsed:.2f}s")
print(f" Drivers: {num_drivers}")
print(f" Average: {elapsed / num_drivers:.3f}s per driver")
print(f" Total points: {len(all_fastest_tels):,}")
# Specific drivers' telemetry (parallel)
start = time.time()
top3_tels = session.get_fastest_laps_tels(
by_driver=True,
drivers=["VER", "HAM", "LEC"]
)
elapsed = time.time() - start
print(f"\n3. Top 3 drivers: {elapsed:.2f}s")
print(f" Drivers: {list(top3_tels['Driver'].unique())}")
Async Context Manager
Use async context managers for cleaner code:
import asyncio
import tif1
class AsyncSessionLoader:
"""Async context manager for session loading."""
def __init__(self, year, event, session_type):
self.year = year
self.event = event
self.session_type = session_type
self.session = None
self.laps = None
async def __aenter__(self):
"""Load session on enter."""
self.session = tif1.get_session(
self.year,
self.event,
self.session_type
)
self.laps = await self.session.laps_async()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Cleanup on exit."""
# Cleanup if needed
pass
async def analyze_with_context():
"""Use async context manager."""
async with AsyncSessionLoader(
2025,
"Monaco Grand Prix",
"Race"
) as loader:
print(f"Loaded {len(loader.laps)} laps")
print(f"Drivers: {len(loader.session.drivers_df)}")
return loader.laps
laps = asyncio.run(analyze_with_context())
Async Error Handling
Handle errors in async context:
import asyncio
import tif1
async def safe_async_load(year, event, session_type):
"""Load session with error handling."""
try:
session = tif1.get_session(year, event, session_type)
laps = await session.laps_async()
print(f"✓ Loaded {len(laps)} laps")
return laps
except tif1.NetworkError as e:
print(f"✗ Network error: {e}")
return None
except tif1.DataNotFoundError as e:
print(f"✗ Data not found: {e}")
return None
except tif1.TIF1Error as e:
print(f"✗ Error: {e}")
return None
# Load multiple sessions with error handling
async def load_multiple_safe():
"""Load multiple sessions safely."""
sessions = [
(2025, "Monaco Grand Prix", "Race"),
(2025, "British Grand Prix", "Qualifying"),
(2025, "Invalid GP", "Race"), # This will fail
]
tasks = [
safe_async_load(year, event, session_type)
for year, event, session_type in sessions
]
results = await asyncio.gather(*tasks)
# Filter out None results
valid_results = [r for r in results if r is not None]
print(f"\nSuccessfully loaded {len(valid_results)}/{len(sessions)} sessions")
return valid_results
results = asyncio.run(load_multiple_safe())
Async with Timeout
Set timeouts for async operations:
import asyncio
import tif1
async def load_with_timeout(year, event, session_type, timeout=30):
"""Load session with timeout."""
try:
session = tif1.get_session(year, event, session_type)
laps = await asyncio.wait_for(
session.laps_async(),
timeout=timeout
)
print(f"✓ Loaded {len(laps)} laps within {timeout}s")
return laps
except asyncio.TimeoutError:
print(f"✗ Timeout: Loading took longer than {timeout}s")
return None
except tif1.TIF1Error as e:
print(f"✗ Error: {e}")
return None
laps = asyncio.run(load_with_timeout(
2025,
"Monaco Grand Prix",
"Race",
timeout=10
))
Concurrent Request Limits
Configure maximum concurrent requests:
# Set max concurrent requests via environment variable
export TIF1_MAX_CONCURRENT_REQUESTS=32
python your_script.py
Or via configuration:
{
"max_concurrent_requests": 32,
"telemetry_prefetch_max_concurrent_requests": 64
}
Best Practices
When to Use Async
- Cold cache: Maximum benefit when data isn’t cached
- Multiple sessions: Loading multiple sessions in parallel
- Large datasets: Sessions with many drivers/laps
- Batch operations: Processing multiple events at once
When Sync Is Fine
- Warm cache: Data already cached (minimal difference)
- Simple scripts: Single session, interactive use
- Learning/debugging: Easier to understand and debug
- Small datasets: Practice sessions with few laps
Example: Smart Loading
import asyncio
import tif1
def smart_load_session(year, event, session_type):
"""Choose between sync and async based on cache."""
cache = tif1.get_cache()
# Check if data is likely cached (simplified check)
# In production, you'd check specific cache keys
session = tif1.get_session(year, event, session_type)
# For now, always use async for cold cache benefit
# You can add cache checking logic here
try:
# Try to use async event loop if available
loop = asyncio.get_event_loop()
if loop.is_running():
# Already in async context
return session.laps
else:
# Use async loading
return asyncio.run(session.laps_async())
except RuntimeError:
# Fallback to sync if async not available
return session.laps
laps = smart_load_session(2025, "Monaco Grand Prix", "Race")
print(f"Loaded {len(laps)} laps")
Complete Async Example
import asyncio
import time
import tif1
async def comprehensive_async_analysis():
"""Comprehensive async loading example."""
print("COMPREHENSIVE ASYNC LOADING")
print("=" * 60)
# 1. Load single session
print("\n1. Single Session (Async)")
session = tif1.get_session(2025, "Monaco Grand Prix", "Race")
start = time.time()
laps = await session.laps_async()
elapsed = time.time() - start
print(f" Loaded {len(laps)} laps in {elapsed:.2f}s")
# 2. Load multiple sessions in parallel
print("\n2. Multiple Sessions (Parallel)")
async def load_session(year, event, session_type):
session = tif1.get_session(year, event, session_type)
laps = await session.laps_async()
return (event, session_type, len(laps))
sessions = [
(2025, "Monaco Grand Prix", "Qualifying"),
(2025, "British Grand Prix", "Qualifying"),
(2025, "Silverstone Grand Prix", "Race"),
]
start = time.time()
tasks = [load_session(*s) for s in sessions]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f" Loaded {len(results)} sessions in {elapsed:.2f}s:")
for event, session_type, lap_count in results:
print(f" • {event} {session_type}: {lap_count} laps")
# 3. Parallel telemetry loading
print("\n3. Parallel Telemetry Loading")
session = tif1.get_session(2025, "Monaco Grand Prix", "Race")
start = time.time()
all_fastest_tels = session.get_fastest_laps_tels(by_driver=True)
elapsed = time.time() - start
num_drivers = len(all_fastest_tels["Driver"].unique())
print(f" Loaded {num_drivers} drivers' telemetry in {elapsed:.2f}s")
print(f" Average: {elapsed / num_drivers:.3f}s per driver")
# Clear cache for demonstration
cache = tif1.get_cache()
cache.clear()
# Run the async analysis
asyncio.run(comprehensive_async_analysis())
Next Steps