Overview
The IngestPipeline class orchestrates event ingestion, handling redaction, normalization, deduplication, and writes to both local mirror and Mubit memory.
Class: IngestPipeline
Constructor
constructor(
mirror: MirrorAppender,
options?: IngestPipelineOptions
)
Local mirror storage (typically JsonlMirror)
Memory backend (typically MubitMemoryEngine)
options.failOnMemoryError
Throw on memory write errors (default: fail-open)
options.onMemoryError
(error: unknown, event: CapturedEventEnvelope) => void
Error handler for memory write failures
options.memoryWriteTimeoutMs
Timeout for individual memory writes (milliseconds)
options.memoryMaxConsecutiveErrors
Max consecutive errors before opening circuit breaker
options.memoryWriteConcurrency
Concurrent memory write tasks
Batch size for memory writes (requires writeEventsBatch support)
options.retryMemoryWriteOnLocalDedup
Retry memory write even if event was deduplicated locally
Default actor ID when not provided in context
Methods
ingest
Ingests an event with normalization and dual-store writes.
async ingest(
eventType: string,
payload: Record<string, unknown>,
ctx: IngestContext
): Promise<CapturedEventEnvelope>
Event type (e.g., “prompt.submitted”, “item.completed”)
payload
Record<string, unknown>
required
Event payload (will be redacted automatically)
Event context for normalization
Sequence number within session
Actor ID (falls back to defaultActorId)
Explicit event ID (auto-generated if not provided)
Explicit timestamp (defaults to current time)
Returns: The normalized and ingested CapturedEventEnvelope
ingestRawLine
Ingests a raw JSONL line to session’s raw log.
async ingestRawLine(
sessionId: string,
line: string
): Promise<void>
flush
Flushes all pending writes to mirror and memory.
async flush(): Promise<void>
Ingestion Pipeline Flow
Redaction
The pipeline automatically redacts sensitive data:
- API keys and tokens
- SSH keys
- AWS credentials
- Generic secrets patterns
See src/lib/redactor.ts for full redaction logic.
Deduplication
Events are deduplicated at two levels:
- Mirror level: Based on
eventId (deterministic hash)
- Memory level: Optional retry on local dedup via
retryMemoryWriteOnLocalDedup
Circuit Breaker
After memoryMaxConsecutiveErrors consecutive memory write failures, the circuit opens:
- Local mirror writes continue
- Memory writes are skipped
- Circuit auto-closes on successful write
Usage Example
Basic Usage
import { IngestPipeline } from './lib/ingest-pipeline';
import { JsonlMirror } from './lib/mirror-jsonl';
import { MubitMemoryEngine } from './lib/memory-mubit';
const mirror = new JsonlMirror('.codaph', {
indexWriteMode: 'batch',
autoFlushEveryEvents: 50,
});
const memory = new MubitMemoryEngine({
apiKey: process.env.MUBIT_API_KEY,
runScope: 'project',
});
const pipeline = new IngestPipeline(mirror, {
memoryEngine: memory,
memoryWriteConcurrency: 4,
memoryBatchSize: 20,
memoryWriteTimeoutMs: 30000,
onMemoryError: (error, event) => {
console.warn('Mubit write failed:', error);
},
});
const event = await pipeline.ingest(
'prompt.submitted',
{
prompt: 'Fix authentication bug',
model: 'gpt-4',
},
{
source: 'codex_sdk',
repoId: 'abc123',
sessionId: 'session-1',
threadId: 'thread-1',
sequence: 1,
}
);
await pipeline.flush();
High-Throughput Configuration
For history imports with thousands of events:
const pipeline = new IngestPipeline(mirror, {
memoryEngine: memory,
memoryWriteConcurrency: 8,
memoryBatchSize: 50,
memoryWriteTimeoutMs: 60000,
memoryMaxConsecutiveErrors: 5,
});
for (const event of historicalEvents) {
await pipeline.ingest(event.type, event.payload, event.context);
}
await pipeline.flush();
Error Handling
const pipeline = new IngestPipeline(mirror, {
memoryEngine: memory,
failOnMemoryError: false, // Fail-open (default)
onMemoryError: (error, event) => {
logger.error('Memory write failed', {
error,
eventId: event.eventId,
sessionId: event.sessionId,
});
},
});
Concurrency
Increase concurrent memory writes for better throughput:
memoryWriteConcurrency: 8 // 8 parallel Mubit writes
Batching
Enable batch writes (requires memory engine support):
memoryBatchSize: 50 // Group 50 events per Mubit request
Timeouts
Adjust timeout based on batch size:
memoryWriteTimeoutMs: 60000 // 60 seconds for large batches
Testing
For testing, you can disable memory writes:
const pipeline = new IngestPipeline(mirror);
// No memory engine = mirror-only writes