Skip to main content

Overview

Transformers are custom data processing functions that transform entities from one schema to another before indexing. They enable:
  • Data enrichment: Add computed fields or metadata
  • Schema mapping: Convert between entity definitions
  • Filtering: Remove sensitive or irrelevant data
  • Aggregation: Combine multiple entities into summaries
Transformers run after data extraction but before chunking and embedding.

Transformer Model

Each transformer is defined in the database with:
name
string
required
Human-readable transformer nameExample: "Enrich Support Tickets"
description
string
Optional description of what the transformer doesExample: "Adds customer sentiment and priority scores to support tickets"
method_name
string
required
Python function name to invokeExample: "enrich_support_ticket"
module_name
string
required
Python module path where the function is definedExample: "airweave.platform.transformers.support"
input_entity_definition_ids
list[UUID]
required
List of entity definition IDs this transformer accepts as inputExample: ["abc-123-def"] (Support Ticket entity)
output_entity_definition_ids
list[UUID]
required
List of entity definition IDs this transformer produces as outputExample: ["xyz-789-ghi"] (Enriched Support Ticket entity)
config_schema
JSONSchema
required
JSON Schema defining configuration parameters for the transformerExample:
{
  "type": "object",
  "properties": {
    "sentiment_threshold": {
      "type": "number",
      "minimum": 0,
      "maximum": 1,
      "default": 0.5
    },
    "priority_weights": {
      "type": "object",
      "properties": {
        "urgency": {"type": "number"},
        "impact": {"type": "number"}
      }
    }
  }
}
organization_id
UUID
Organization that owns this transformer (null for system transformers)

Database Schema

CREATE TABLE transformer (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR NOT NULL,
    description VARCHAR,
    method_name VARCHAR NOT NULL,
    module_name VARCHAR NOT NULL,
    input_entity_definition_ids JSON NOT NULL,   -- Array of UUIDs
    output_entity_definition_ids JSON NOT NULL,  -- Array of UUIDs
    config_schema JSON NOT NULL,                 -- JSON Schema
    organization_id UUID REFERENCES organization(id),
    created_at TIMESTAMP DEFAULT NOW(),
    modified_at TIMESTAMP DEFAULT NOW(),
    created_by_email VARCHAR,
    modified_by_email VARCHAR
);

API Endpoints

Manage transformers via REST API:

List Transformers

GET /api/v1/transformers
Response:
[
  {
    "id": "abc-123-def-456",
    "name": "Enrich Support Tickets",
    "description": "Adds sentiment and priority scores",
    "method_name": "enrich_support_ticket",
    "module_name": "airweave.platform.transformers.support",
    "input_entity_definition_ids": ["entity-def-1"],
    "output_entity_definition_ids": ["entity-def-2"],
    "config_schema": {...},
    "organization_id": "org-123",
    "created_by_email": "admin@example.com",
    "modified_by_email": "admin@example.com"
  }
]

Create Transformer

POST /api/v1/transformers
Content-Type: application/json

{
  "name": "Code Comment Extractor",
  "description": "Extracts docstrings and inline comments from code",
  "method_name": "extract_code_comments",
  "module_name": "airweave.platform.transformers.code",
  "input_entity_definition_ids": ["code-file-entity-id"],
  "output_entity_definition_ids": ["comment-entity-id"],
  "config_schema": {
    "type": "object",
    "properties": {
      "include_inline": {"type": "boolean", "default": true},
      "min_length": {"type": "integer", "default": 10}
    }
  }
}
Response: 201 Created with transformer object

Update Transformer

PUT /api/v1/transformers/{transformer_id}
Content-Type: application/json

{
  "name": "Updated Transformer Name",
  "description": "Updated description",
  ...
}
Response: 200 OK with updated transformer object

Implementation Example

Create a transformer function:
# File: backend/airweave/platform/transformers/support.py

from typing import Any, Dict, List

async def enrich_support_ticket(
    entities: List[Dict[str, Any]],
    config: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """Enrich support tickets with sentiment and priority.
    
    Args:
        entities: List of support ticket entities from input definition
        config: Configuration from transformer config_schema
    
    Returns:
        List of enriched entities matching output definition
    """
    enriched = []
    
    for ticket in entities:
        # Extract configuration
        sentiment_threshold = config.get("sentiment_threshold", 0.5)
        priority_weights = config.get("priority_weights", {
            "urgency": 0.6,
            "impact": 0.4
        })
        
        # Analyze sentiment (simplified example)
        text = ticket.get("description", "")
        sentiment_score = await analyze_sentiment(text)
        
        # Calculate priority
        urgency = ticket.get("urgency", 0)
        impact = ticket.get("impact", 0)
        priority = (
            urgency * priority_weights["urgency"] + 
            impact * priority_weights["impact"]
        )
        
        # Create enriched entity
        enriched_ticket = {
            **ticket,  # Original fields
            "sentiment_score": sentiment_score,
            "sentiment_label": "positive" if sentiment_score > sentiment_threshold else "negative",
            "calculated_priority": priority,
            "priority_label": "high" if priority > 0.7 else "medium" if priority > 0.4 else "low"
        }
        
        enriched.append(enriched_ticket)
    
    return enriched

Transformer Function Signature

All transformer functions must follow this signature:
async def transformer_name(
    entities: List[Dict[str, Any]],
    config: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """Transformer docstring.
    
    Args:
        entities: Input entities matching input_entity_definition_ids
        config: Configuration validated against config_schema
    
    Returns:
        Output entities matching output_entity_definition_ids
    """
    pass
Requirements:
  • Must be async
  • Takes exactly 2 parameters: entities and config
  • Returns list of dictionaries (entities)
  • Can be in any module (specify via module_name)

Configuration Schema

Define transformer parameters using JSON Schema:
{
  "type": "object",
  "properties": {
    "enabled": {
      "type": "boolean",
      "default": true,
      "description": "Enable/disable this transformer"
    },
    "threshold": {
      "type": "number",
      "minimum": 0,
      "maximum": 1,
      "default": 0.5
    }
  },
  "required": ["threshold"]
}

Execution Pipeline

Transformers are executed during the sync pipeline:
1

Entity Extraction

Source connector extracts raw entities from API
2

Transformer Lookup

System looks up transformers configured for this entity definition
3

Execution

Transformers execute in configured order:
for transformer in transformers:
    entities = await invoke_transformer(
        transformer.module_name,
        transformer.method_name,
        entities,
        transformer_config
    )
4

Schema Validation

Output entities validated against output entity definition schema
5

Continue Pipeline

Transformed entities proceed to chunking → embedding → indexing

Best Practices

Each transformer should do one thing well:Good:
  • extract_code_comments - Single purpose
  • calculate_priority - Specific calculation
  • redact_pii - Clear responsibility
Avoid:
  • process_everything - Too broad
  • enrich_and_filter_and_map - Multiple concerns
Make transformers configurable via config_schema:
# Instead of hardcoding
if sentiment_score > 0.5:  # ❌ Hardcoded
    ...

# Use config
threshold = config.get("sentiment_threshold", 0.5)  # ✅ Configurable
if sentiment_score > threshold:
    ...
Don’t let single entity failures break entire batch:
results = []

for entity in entities:
    try:
        transformed = await transform_entity(entity, config)
        results.append(transformed)
    except Exception as e:
        logger.error(f"Failed to transform entity {entity.get('id')}: {e}")
        # Option 1: Skip entity
        continue
        # Option 2: Return original entity
        # results.append(entity)

return results
Clearly document expected entity structure:
async def my_transformer(
    entities: List[Dict[str, Any]],
    config: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """Transform support tickets.
    
    Input schema (from Zendesk):
    {
        "id": "123",
        "subject": "...",
        "description": "...",
        "priority": "high"|"medium"|"low",
        "status": "open"|"closed"
    }
    
    Output schema (enriched):
    {
        ... (all input fields) ...
        "sentiment_score": 0.0-1.0,
        "urgency_level": 1-5,
        "estimated_resolution_time": "2h"|"1d"|"1w"
    }
    
    Config schema:
    {
        "sentiment_model": "basic"|"advanced",
        "urgency_weights": {"priority": 0.6, "age": 0.4}
    }
    """
Process entities in batches when possible:
# ❌ Inefficient: One API call per entity
for entity in entities:
    sentiment = await api.analyze(entity["text"])

# ✅ Efficient: Batch API call
texts = [e["text"] for e in entities]
sentiments = await api.analyze_batch(texts)

for entity, sentiment in zip(entities, sentiments):
    entity["sentiment"] = sentiment

Use Cases

Add computed or external data:Examples:
  • Sentiment analysis on customer feedback
  • Geocoding addresses to lat/lng
  • Fetching stock prices for company mentions
  • Calculating metrics from raw data
  • Adding taxonomy/category labels

Troubleshooting

Check:
  1. Transformer registered in database
  2. input_entity_definition_ids matches source entities
  3. module_name and method_name are correct
  4. Function signature matches protocol
Debug:
# Add logging to transformer
logger.info(f"Transformer {transformer.name} executing on {len(entities)} entities")
Symptom:
ModuleNotFoundError: No module named 'airweave.platform.transformers.custom'
Solution: Ensure module exists at specified path:
backend/airweave/platform/transformers/custom.py
Symptom: Transformer fails with config errorsSolution: Validate config against schema before saving:
from jsonschema import validate, ValidationError

try:
    validate(instance=config, schema=transformer.config_schema)
except ValidationError as e:
    print(f"Config invalid: {e.message}")
Symptom: Transformed entities fail validationSolution: Ensure output matches output_entity_definition_ids schema:
# Check required fields are present
required_fields = ["id", "title", "content"]
for entity in output_entities:
    for field in required_fields:
        if field not in entity:
            raise ValueError(f"Missing required field: {field}")

Next Steps

Entity Definitions

Define input and output schemas

Chunking

Configure chunking after transformation

Embeddings

Set up embeddings for transformed entities

API Reference

Complete API documentation

Build docs developers (and LLMs) love