Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/TrustifAI/trustifai/llms.txt

Use this file to discover all available pages before exploring further.

AsyncTrustifai wraps the synchronous Trustifai engine in a thread-safe async interface. Each worker thread gets its own Trustifai instance via threading.local(), so concurrent evaluations never race on shared state. The evaluate_dataset function orchestrates concurrency, rate limiting, retries, and result ordering — letting you focus on your data rather than async plumbing.

Installation

evaluate_dataset uses tqdm for progress reporting. Install it alongside TrustifAI if you want the progress bar:
pip install trustifai tqdm

Basic usage

1

Build an AsyncTrustifai engine

Create one engine instance and share it across all evaluations in a session:
from trustifai.async_pipeline import AsyncTrustifai

engine = AsyncTrustifai("config_file.yaml")
2

Prepare MetricContext objects

Each row in your dataset becomes a MetricContext. Documents can be plain strings, LangChain Document objects, LlamaIndex NodeWithScore objects, or dicts — see Integrations for details.
from trustifai import MetricContext
from langchain_core.documents import Document

def build_contexts(dataset: list[dict]) -> list[MetricContext]:
    return [
        MetricContext(
            query=row["query"],
            answer=row["answer"],
            documents=[
                Document(page_content=text, metadata={"source": f"doc_{i}"})
                for i, text in enumerate(row["docs"])
            ],
        )
        for row in dataset
    ]
3

Run evaluate_dataset

Call evaluate_dataset inside an async context (a script’s asyncio.run, a FastAPI route, or a Jupyter cell):
import asyncio
from trustifai.async_pipeline import evaluate_dataset

async def run_batch():
    contexts = build_contexts(RAW_DATASET)

    batch = await evaluate_dataset(
        engine,
        contexts,
        concurrency=5,       # max simultaneous LLM calls
        show_progress=True,  # tqdm bar in terminal or Jupyter
    )

    print(batch.summary())
    return batch

asyncio.run(run_batch())

Complete example

The following script reproduces the full example from examples/evaluation_script.py:
import asyncio
from langchain_core.documents import Document

from trustifai.async_pipeline import AsyncTrustifai, evaluate_dataset
from trustifai import MetricContext

# Shared engine — one instance, thread-safe across all workers
engine = AsyncTrustifai("config_file.yaml")

# Example dataset — replace with your actual data source (CSV, DB, etc.)
RAW_DATASET = [
    {
        "query": "What is the capital of France?",
        "answer": "The capital of France is Paris.",
        "docs": ["Paris is the capital and most populous city of France."],
    },
    {
        "query": "Who wrote Romeo and Juliet?",
        "answer": "Romeo and Juliet was written by William Shakespeare.",
        "docs": ["Romeo and Juliet is a tragedy written by William Shakespeare."],
    },
    {
        "query": "What is photosynthesis?",
        "answer": "Photosynthesis converts sunlight into glucose using CO2 and water.",
        "docs": [
            "Photosynthesis is a process used by plants to convert light energy into chemical energy.",
            "The process uses carbon dioxide and water, releasing oxygen as a by-product.",
        ],
    },
]


def build_contexts(dataset: list[dict]) -> list[MetricContext]:
    return [
        MetricContext(
            query=row["query"],
            answer=row["answer"],
            documents=[
                Document(page_content=text, metadata={"source": f"doc_{i}"})
                for i, text in enumerate(row["docs"])
            ],
        )
        for row in dataset
    ]


# Single async evaluation
async def run_single():
    context = build_contexts(RAW_DATASET)[0]
    result = await engine.get_trust_score(context)
    print(f"\n[Single] Trust Score: {result['score']}  Label: {result['label']}")
    return result


# Full batch evaluation
async def run_batch():
    contexts = build_contexts(RAW_DATASET)

    batch = await evaluate_dataset(
        engine,
        contexts,
        concurrency=5,
        show_progress=True,
    )

    print("\n" + batch.summary())

    # Access individual results in original dataset order
    for i, result in enumerate(batch.results):
        q = RAW_DATASET[i]["query"]
        print(f"  [{i}] Q: {q!r}  →  score={result['score']}  label={result['label']}")

    # Handle any failures gracefully
    if batch.failed:
        print(f"\n  {len(batch.failed)} evaluation(s) failed:")
        for f in batch.failed:
            print(f"    index={f['index']}  error={f['error']}")

    return batch


if __name__ == "__main__":
    asyncio.run(run_single())
    asyncio.run(run_batch())

evaluate_dataset parameters

ParameterTypeDefaultDescription
engineAsyncTrustifaiThe async engine instance
contextslist[MetricContext]One MetricContext per dataset row
concurrencyint5Maximum simultaneous evaluations. Controlled by an asyncio Semaphore
show_progressboolTrueDisplay a tqdm progress bar
fail_fastboolFalseCancel remaining tasks on the first error
requests_per_minutefloat | NoneNoneRPM cap enforced by a token-bucket RateLimiter. None disables rate limiting
rate_limit_burstint1Max tokens that can accumulate while idle. Keep at 1 for free-tier keys
max_retriesint5Retry attempts on a 429 / rate-limit error
retry_base_delayfloat2.0Initial backoff in seconds (doubles each attempt)
retry_max_delayfloat120.0Maximum backoff delay in seconds

Rate limiting

evaluate_dataset uses a token-bucket RateLimiter that proactively spaces requests before they reach the API, preventing 429 errors before they occur. The semaphore caps how many evaluations run simultaneously; the rate limiter caps how fast they start.
batch = await evaluate_dataset(
    engine,
    contexts,
    concurrency=1,           # one at a time
    requests_per_minute=10,  # stay within free-tier quota
)
Set requests_per_minute to roughly 80% of your actual API quota to leave headroom for retries. For free-tier Gemini or Mistral keys, 8–12 RPM is a safe starting point. OpenAI Tier-1 keys typically allow 500 RPM, so 400 is a good ceiling.

Retry backoff schedule

When a rate-limit error is detected, evaluate_dataset retries with exponential backoff plus ±25% jitter to prevent thundering-herd effects:
AttemptWait (no jitter)
1 → fail2 s
2 → fail4 s
3 → fail8 s
4 → fail16 s
5 → failraise
Non-rate-limit errors (authentication failures, malformed requests) are re-raised immediately and are not retried.

Working with BatchResult

evaluate_dataset returns a BatchResult dataclass with the following attributes and properties:
Attribute / propertyDescription
.resultsList of successful trust score dicts, sorted in original dataset order
.failedList of {index, context, error} dicts for failed evaluations
.totalNumber of input contexts
.succeededNumber that completed without error
.elapsed_secondsWall-clock time for the full batch run
.mean_scoreMean trust score across all successful results
.score_distribution{min, median, max} of trust scores
.label_distributionCount of each label (RELIABLE, ACCEPTABLE (WITH CAUTION), UNRELIABLE)
.failure_rateFraction of evaluations that failed
.summary()Formatted multi-line summary string

Handling failures

Failures are isolated — a single failed evaluation never aborts the batch (unless fail_fast=True). Inspect .failed after the batch completes:
batch = await evaluate_dataset(engine, contexts)

if batch.failed:
    for failure in batch.failed:
        print(f"Row {failure['index']} failed: {failure['error']}")
        # Optionally re-queue failure['context'] for a second pass

Pandas integration

batch.results is a plain list of dicts, making it straightforward to load into a DataFrame for further analysis:
import pandas as pd

batch = await evaluate_dataset(engine, contexts, concurrency=5)

df = pd.DataFrame(batch.results)
print(df[["score", "label"]].describe())

# Distribution of trust labels
print(df["label"].value_counts())

# Filter unreliable responses for manual review
unreliable = df[df["label"] == "UNRELIABLE"]
print(f"{len(unreliable)} responses flagged for review")

Jupyter usage

Jupyter already runs an event loop, so asyncio.run() raises a RuntimeError. Use nest_asyncio to patch the running loop instead:
pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()  # call once per kernel session

engine = AsyncTrustifai("config_file.yaml")
contexts = build_contexts(RAW_DATASET)

# Single evaluation
result = await engine.get_trust_score(contexts[0])

# Batch
batch = await evaluate_dataset(engine, contexts, concurrency=5)
print(batch.summary())

Configuration

Set concurrency defaults and LLM credentials in config_file.yaml.

Integrations

Feed LangChain, LlamaIndex, or plain string documents into MetricContext.

Build docs developers (and LLMs) love