Use this file to discover all available pages before exploring further.
AsyncTrustifai wraps the synchronous Trustifai engine in a thread-safe async interface. Each worker thread gets its own Trustifai instance via threading.local(), so concurrent evaluations never race on shared state. The evaluate_dataset function orchestrates concurrency, rate limiting, retries, and result ordering — letting you focus on your data rather than async plumbing.
Create one engine instance and share it across all evaluations in a session:
from trustifai.async_pipeline import AsyncTrustifaiengine = AsyncTrustifai("config_file.yaml")
2
Prepare MetricContext objects
Each row in your dataset becomes a MetricContext. Documents can be plain strings, LangChain Document objects, LlamaIndex NodeWithScore objects, or dicts — see Integrations for details.
from trustifai import MetricContextfrom langchain_core.documents import Documentdef build_contexts(dataset: list[dict]) -> list[MetricContext]: return [ MetricContext( query=row["query"], answer=row["answer"], documents=[ Document(page_content=text, metadata={"source": f"doc_{i}"}) for i, text in enumerate(row["docs"]) ], ) for row in dataset ]
3
Run evaluate_dataset
Call evaluate_dataset inside an async context (a script’s asyncio.run, a FastAPI route, or a Jupyter cell):
import asynciofrom trustifai.async_pipeline import evaluate_datasetasync def run_batch(): contexts = build_contexts(RAW_DATASET) batch = await evaluate_dataset( engine, contexts, concurrency=5, # max simultaneous LLM calls show_progress=True, # tqdm bar in terminal or Jupyter ) print(batch.summary()) return batchasyncio.run(run_batch())
The following script reproduces the full example from examples/evaluation_script.py:
import asynciofrom langchain_core.documents import Documentfrom trustifai.async_pipeline import AsyncTrustifai, evaluate_datasetfrom trustifai import MetricContext# Shared engine — one instance, thread-safe across all workersengine = AsyncTrustifai("config_file.yaml")# Example dataset — replace with your actual data source (CSV, DB, etc.)RAW_DATASET = [ { "query": "What is the capital of France?", "answer": "The capital of France is Paris.", "docs": ["Paris is the capital and most populous city of France."], }, { "query": "Who wrote Romeo and Juliet?", "answer": "Romeo and Juliet was written by William Shakespeare.", "docs": ["Romeo and Juliet is a tragedy written by William Shakespeare."], }, { "query": "What is photosynthesis?", "answer": "Photosynthesis converts sunlight into glucose using CO2 and water.", "docs": [ "Photosynthesis is a process used by plants to convert light energy into chemical energy.", "The process uses carbon dioxide and water, releasing oxygen as a by-product.", ], },]def build_contexts(dataset: list[dict]) -> list[MetricContext]: return [ MetricContext( query=row["query"], answer=row["answer"], documents=[ Document(page_content=text, metadata={"source": f"doc_{i}"}) for i, text in enumerate(row["docs"]) ], ) for row in dataset ]# Single async evaluationasync def run_single(): context = build_contexts(RAW_DATASET)[0] result = await engine.get_trust_score(context) print(f"\n[Single] Trust Score: {result['score']} Label: {result['label']}") return result# Full batch evaluationasync def run_batch(): contexts = build_contexts(RAW_DATASET) batch = await evaluate_dataset( engine, contexts, concurrency=5, show_progress=True, ) print("\n" + batch.summary()) # Access individual results in original dataset order for i, result in enumerate(batch.results): q = RAW_DATASET[i]["query"] print(f" [{i}] Q: {q!r} → score={result['score']} label={result['label']}") # Handle any failures gracefully if batch.failed: print(f"\n {len(batch.failed)} evaluation(s) failed:") for f in batch.failed: print(f" index={f['index']} error={f['error']}") return batchif __name__ == "__main__": asyncio.run(run_single()) asyncio.run(run_batch())
evaluate_dataset uses a token-bucket RateLimiter that proactively spaces requests before they reach the API, preventing 429 errors before they occur. The semaphore caps how many evaluations run simultaneously; the rate limiter caps how fast they start.
batch = await evaluate_dataset( engine, contexts, concurrency=1, # one at a time requests_per_minute=10, # stay within free-tier quota)
Set requests_per_minute to roughly 80% of your actual API quota to leave headroom for retries. For free-tier Gemini or Mistral keys, 8–12 RPM is a safe starting point. OpenAI Tier-1 keys typically allow 500 RPM, so 400 is a good ceiling.
Failures are isolated — a single failed evaluation never aborts the batch (unless fail_fast=True). Inspect .failed after the batch completes:
batch = await evaluate_dataset(engine, contexts)if batch.failed: for failure in batch.failed: print(f"Row {failure['index']} failed: {failure['error']}") # Optionally re-queue failure['context'] for a second pass