Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/headroomlabs-ai/headroom/llms.txt

Use this file to discover all available pages before exploring further.

Headroom’s compression subsystem is built from composable transform classes. When you use HeadroomClient or compress(), these transforms run automatically. You can also call them directly when you need fine-grained control — to benchmark individual transforms, build custom pipelines, or integrate into frameworks that already manage their own request loop.
from headroom import SmartCrusher, CacheAligner, TransformPipeline

# Compose a minimal pipeline: CacheAligner → SmartCrusher
pipeline = TransformPipeline(
    transforms=[CacheAligner(), SmartCrusher()]
)

result = pipeline.apply(messages=messages, model="gpt-4o", model_limit=128000)
print(f"Tokens: {result.tokens_before}{result.tokens_after}")

SmartCrusher

SmartCrusher is Headroom’s primary compressor for JSON and array tool outputs. It uses statistical analysis to identify which items are important — errors, anomalies, query-relevant entries, and boundary items — and drops everything else. The output preserves the original JSON schema exactly; no wrapper objects or generated text are inserted.

What SmartCrusher Preserves

CategoryGuaranteeMechanism
Error items100% keptstatus: error, level: error, exception objects
First N items100% keptConfigurable first_fraction of max_items_after_crush
Last N items100% keptConfigurable last_fraction of max_items_after_crush
Anomalies100% keptNumeric values > variance_threshold std devs from mean
Relevant itemsTop K keptBM25 / embedding / hybrid scoring against user query
Change pointsKeptSignificant data transitions (5-item window)
A 1,000-item array typically compresses to ~15–50 items — a 90%+ reduction — while preserving all the information the LLM needs.

Constructor

SmartCrusher(config: SmartCrusherConfig | None = None)
config
SmartCrusherConfig | None
default:"None"
Compression configuration. When None, uses SmartCrusherConfig() defaults. See SmartCrusherConfig for all fields.

Direct Usage

from headroom import SmartCrusher, SmartCrusherConfig

crusher = SmartCrusher(
    SmartCrusherConfig(
        max_items_after_crush=20,
        min_tokens_to_crush=150,
        first_fraction=0.3,
        last_fraction=0.15,
        preserve_change_points=True,
    )
)

# SmartCrusher operates on messages via TransformPipeline.apply()
# or indirectly through compress()
SmartCrusher is backed by a Rust extension (headroom._core.SmartCrusher) built with PyO3. The public Python surface — SmartCrusherConfig, SmartCrusher, and CrushResult — is unchanged. Build the extension locally with scripts/build_rust_extension.sh or install a prebuilt wheel.

CCR Sentinels

When SmartCrusher’s lossy row-drop path removes items, it appends a sentinel object to the kept-items array:
{"_ccr_dropped": "<<ccr:HASH 980_rows_offloaded>>"}
The LLM sees this marker and can call headroom_retrieve(hash) to fetch the original data. If you iterate a compressed array and need to skip sentinels:
from headroom.transforms.smart_crusher import strip_ccr_sentinels, is_ccr_sentinel

items = json.loads(compressed_tool_output)
clean_items = strip_ccr_sentinels(items)  # filters out sentinel objects

CacheAligner

CacheAligner is a detector-only transform. It scans system messages for volatile content that would cause provider KV-cache misses and logs warnings when instability is found. It does not modify messages — the system prompt is never mutated. Detected volatile patterns include:
  • UUIDs — RFC 4122 canonical form (36 chars with dashes)
  • ISO 8601 timestamps — parsed via datetime.fromisoformat
  • JWTs — three dot-separated base64url segments
  • Hex hashes — MD5 (32), SHA1 (40), SHA256 (64) character strings

Constructor

CacheAligner(config: CacheAlignerConfig | None = None)
config
CacheAlignerConfig | None
default:"None"
Aligner configuration. When None, uses CacheAlignerConfig() defaults. See CacheAlignerConfig.

Direct Usage

from headroom import CacheAligner, CacheAlignerConfig

aligner = CacheAligner(
    CacheAlignerConfig(
        enabled=True,
        use_dynamic_detector=True,
        detection_tiers=["regex"],
        entropy_threshold=0.7,
    )
)

# Get alignment score (0.0–100.0) without running the full pipeline
score = aligner.get_alignment_score(messages)
print(f"Cache alignment score: {score:.1f}")

get_alignment_score()

aligner.get_alignment_score(messages: list[dict[str, Any]]) -> float
Returns a score from 0.0 (highly volatile prefix) to 100.0 (perfectly stable prefix). Each detected volatile pattern (UUID, timestamp, JWT, etc.) deducts 10 points. Useful for monitoring cache health without running the full pipeline.

TransformPipeline

TransformPipeline orchestrates transforms in sequence. The default pipeline runs:
  1. CacheAligner — detect volatile content in the system prefix
  2. ContentRouter — route each message to the appropriate compressor:
    • SmartCrusher for JSON arrays
    • Kompress for text
    • CodeAwareCompressor for source code
    • SearchCompressor for web/grep results
    • LogCompressor for log output
    • DiffCompressor for diffs
You can supply a custom transforms list to override the default order.

Constructor

TransformPipeline(
    config: HeadroomConfig | None = None,
    transforms: list[Transform] | None = None,
    provider: Provider | None = None,
)
config
HeadroomConfig | None
default:"None"
Full Headroom configuration. When None, HeadroomConfig() defaults are used.
transforms
list[Transform] | None
default:"None"
Custom transform list. When provided, replaces the default [CacheAligner, ContentRouter] order entirely.
provider
Provider | None
default:"None"
Provider for model-specific tokenization. Used when building the default pipeline with per-provider behavior.

apply()

pipeline.apply(
    messages: list[dict[str, Any]],
    model: str,
    **kwargs,
) -> TransformResult
Run the pipeline synchronously. Returns a TransformResult. The model_limit keyword argument is required and must be provided explicitly (there is no default — passing None raises ValueError). Common kwargs:
  • model_limit: int — context window size in tokens (required)
  • output_buffer: int — tokens to reserve for model output (default 4000)
  • tool_profiles: dict[str, dict] — per-tool compression profiles

simulate()

pipeline.simulate(
    messages: list[dict[str, Any]],
    model: str,
    **kwargs,
) -> TransformResult
Run the pipeline in dry-run mode. Returns the same TransformResult as apply() but does not persist metrics. Accepts the same kwargs as apply(), including the required model_limit.

CompressionHooks

CompressionHooks is a base class with no-op defaults. Subclass it to inject custom logic at three well-defined pipeline stages:
  1. pre_compress — modify messages before compression
  2. compute_biases — set per-message compression aggressiveness
  3. post_compress — observe results after compression
  4. on_pipeline_event — observe canonical lifecycle events
from headroom.hooks import CompressionHooks, CompressContext, CompressEvent
from headroom.pipeline import PipelineEvent

class MyHooks(CompressionHooks):
    def pre_compress(
        self,
        messages: list[dict],
        ctx: CompressContext,
    ) -> list[dict]:
        # Filter, inject, or reorder messages before compression
        return messages

    def compute_biases(
        self,
        messages: list[dict],
        ctx: CompressContext,
    ) -> dict[int, float]:
        # Return {message_index: bias} — higher = compress less
        biases = {}
        n = len(messages)
        for i in range(n):
            pos = i / max(n - 1, 1)
            # Middle messages get higher bias (LLM attention is weaker there)
            biases[i] = 1.0 + 0.5 * (1.0 - abs(2 * pos - 1))
        return biases

    def post_compress(self, event: CompressEvent) -> None:
        print(
            f"[{event.model}] {event.tokens_before}{event.tokens_after} "
            f"({event.compression_ratio:.0%} saved via {event.transforms_applied})"
        )

    def on_pipeline_event(self, event: PipelineEvent) -> PipelineEvent | None:
        # Observe lifecycle events without modifying them
        print(f"Stage: {event.stage.value}, model: {event.model}")
        return None

pre_compress

def pre_compress(
    self,
    messages: list[dict[str, Any]],
    ctx: CompressContext,
) -> list[dict[str, Any]]: ...
Called before the pipeline runs. Modify and return the message list. Use for cross-turn deduplication, memory injection, pre-filtering, or phase detection.

compute_biases

def compute_biases(
    self,
    messages: list[dict[str, Any]],
    ctx: CompressContext,
) -> dict[int, float]: ...
Return {message_index: bias_float}. Values:
  • 1.0 — default compression
  • > 1.0 — compress less aggressively (keep more)
  • < 1.0 — compress more aggressively
  • Missing indices default to 1.0

post_compress

def post_compress(self, event: CompressEvent) -> None: ...
Called after compression completes. Observational only — do not modify anything here. Use for analytics, A/B testing, or anomaly detection.

CompressContext Fields

model
str
Model name for this compression call.
user_query
str
Extracted user query (empty if not detected).
turn_number
int
Turn counter within the session.
tool_calls
list[str]
Tool names called in this context.
provider
str
Provider name: "anthropic", "openai", "gemini", etc.

CompressEvent Fields

tokens_before
int
Tokens before compression.
tokens_after
int
Tokens after compression.
tokens_saved
int
tokens_before - tokens_after.
compression_ratio
float
Fraction of tokens saved.
transforms_applied
list[str]
Transforms that ran.
ccr_hashes
list[str]
CCR hashes for any offloaded data.
model
str
Model name.
user_query
str
Extracted user query.
provider
str
Provider name.

PipelineStage

PipelineStage is a string enum listing the canonical lifecycle stages that PipelineExtensionManager emits events for.
from headroom import PipelineStage

PipelineStage.SETUP              # "setup"
PipelineStage.PRE_START          # "pre_start"
PipelineStage.POST_START         # "post_start"
PipelineStage.INPUT_RECEIVED     # "input_received"
PipelineStage.INPUT_CACHED       # "input_cached"
PipelineStage.INPUT_ROUTED       # "input_routed"
PipelineStage.INPUT_COMPRESSED   # "input_compressed"
PipelineStage.INPUT_REMEMBERED   # "input_remembered"
PipelineStage.PRE_SEND           # "pre_send"
PipelineStage.POST_SEND          # "post_send"
PipelineStage.RESPONSE_RECEIVED  # "response_received"

PipelineEvent

PipelineEvent is the event object emitted at each stage. Extensions receive it via on_pipeline_event.
stage
PipelineStage
The stage this event was emitted from.
operation
str
Operation name, e.g. "sdk.request", "compress".
request_id
str
Unique request identifier (empty string if not set).
provider
str
Provider name.
model
str
Model name.
messages
list[dict[str, Any]] | None
Messages at this stage. May be None for stages that don’t involve messages.
tools
list[dict[str, Any]] | None
Tools list at this stage.
headers
dict[str, str] | None
Request headers at this stage.
response
Any
API response (only at POST_SEND / RESPONSE_RECEIVED).
metadata
dict[str, Any]
Stage-specific metadata (e.g. token counts, transform names).

PipelineExtensionManager

PipelineExtensionManager dispatches PipelineEvent objects to a list of extensions. Extensions are loaded from:
  1. The hooks= argument (any object with on_pipeline_event)
  2. The extensions= list
  3. Auto-discovered entry points under the headroom.pipeline_extension group (when discover=True)
from headroom import PipelineExtensionManager, PipelineStage

manager = PipelineExtensionManager(
    hooks=MyHooks(),
    extensions=[my_extension],
    discover=True,  # also load from entry points
)

event = manager.emit(
    PipelineStage.INPUT_RECEIVED,
    operation="compress",
    model="gpt-4o",
    messages=messages,
)
# event.messages may be replaced if an extension returned a new event

CANONICAL_PIPELINE_STAGES

A tuple of all PipelineStage values in execution order:
from headroom import CANONICAL_PIPELINE_STAGES

# (SETUP, PRE_START, POST_START, INPUT_RECEIVED, INPUT_CACHED,
#  INPUT_ROUTED, INPUT_COMPRESSED, INPUT_REMEMBERED,
#  PRE_SEND, POST_SEND, RESPONSE_RECEIVED)

Custom Pipeline Example

The following example composes a minimal custom pipeline with SmartCrusher and CacheAligner, attaches observability hooks, and runs it directly on a message list:
from headroom import (
    SmartCrusher,
    SmartCrusherConfig,
    CacheAligner,
    CacheAlignerConfig,
    TransformPipeline,
    HeadroomConfig,
)
from headroom.hooks import CompressionHooks, CompressEvent

# 1. Define observability hooks
class SavingsLogger(CompressionHooks):
    def post_compress(self, event: CompressEvent) -> None:
        if event.tokens_saved > 0:
            print(
                f"Saved {event.tokens_saved:,} tokens "
                f"({event.compression_ratio:.0%}) "
                f"via {event.transforms_applied}"
            )

# 2. Build sub-configs
crusher_config = SmartCrusherConfig(
    max_items_after_crush=25,
    min_tokens_to_crush=150,
    preserve_change_points=True,
    lossless_only=False,
)

aligner_config = CacheAlignerConfig(
    enabled=True,
    use_dynamic_detector=True,
    detection_tiers=["regex"],
)

# 3. Compose pipeline with explicit transform order
headroom_config = HeadroomConfig(
    smart_crusher=crusher_config,
    cache_aligner=aligner_config,
)

pipeline = TransformPipeline(
    config=headroom_config,
    transforms=[
        CacheAligner(aligner_config),  # 1st: detect volatile prefix content
        SmartCrusher(crusher_config),  # 2nd: compress JSON arrays
    ],
)

# 4. Apply to messages
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "Find payment failures"},
    {
        "role": "tool",
        "content": '{"results": [' + ",".join(
            [f'{{"id": {i}, "status": "ok"}}' for i in range(500)]
            + [f'{{"id": 500, "status": "error", "message": "timeout"}}']
        ) + "]}",
    },
]

result = pipeline.apply(messages=messages, model="gpt-4o", model_limit=128000)

print(f"Before: {result.tokens_before:,} tokens")
print(f"After:  {result.tokens_after:,} tokens")
print(f"Saved:  {result.tokens_before - result.tokens_after:,} tokens")
print(f"Transforms: {result.transforms_applied}")

Using Transforms with compress()

Hooks integrate directly with the compress() function — no HeadroomClient required:
from headroom import compress
from headroom.hooks import CompressionHooks, CompressEvent

class AuditHook(CompressionHooks):
    def post_compress(self, event: CompressEvent) -> None:
        if event.compression_ratio > 0.8:
            print(f"WARNING: Very high compression ratio {event.compression_ratio:.0%}")

result = compress(
    messages,
    model="claude-sonnet-4-5-20250929",
    hooks=AuditHook(),
    compress_user_messages=False,
    protect_recent=4,
)
When building a custom TransformPipeline with an explicit transforms list, the ContentRouter is not included unless you add it. The ContentRouter is responsible for dispatching individual messages to content-specific compressors (SmartCrusher, Kompress, CodeAwareCompressor, etc.). If you only include SmartCrusher directly, only explicit SmartCrusher logic runs — not the full routing heuristics.

Build docs developers (and LLMs) love