Overview
Stanzo processes live debate audio through a multi-stage pipeline that transforms speech into fact-checked claims in real-time. The pipeline consists of four main stages:
- Audio Capture → Transcription chunks stored in database
- Claim Extraction → AI identifies factual claims from transcript
- Fact Checking → Claims verified against web sources
- UI Updates → Real-time reactivity via Convex subscriptions
The entire pipeline is asynchronous and event-driven, using Convex scheduled actions to trigger each stage without blocking the frontend.
Stage 1: Transcript Chunking
As audio is captured from the debate, it’s transcribed and stored as transcript chunks in the database. Each chunk represents a continuous utterance from one speaker.
Chunk Storage
When new transcript text arrives, it’s inserted with metadata:
// convex/transcriptChunks.ts:21
export const insert = mutation({
args: {
debateId: v.id("debates"),
speaker: v.union(v.literal(0), v.literal(1)),
text: v.string(),
startTime: v.number(),
endTime: v.number(),
},
handler: async (ctx, args) => {
await ctx.db.insert("transcriptChunks", {
...args,
processedForClaims: false, // Mark as unprocessed
})
},
})
Key fields:
speaker: 0 for speaker A, 1 for speaker B
startTime/endTime: Timestamp boundaries for the utterance
processedForClaims: Tracks whether this chunk has been sent to claim extraction
After inserting chunks, the frontend can trigger claim extraction:
// convex/transcriptChunks.ts:39
export const triggerExtraction = mutation({
args: { debateId: v.id("debates") },
handler: async (ctx, args) => {
await ctx.scheduler.runAfter(0, internal.claimExtraction.extract, {
debateId: args.debateId,
})
},
})
ctx.scheduler.runAfter(0, ...) schedules an action to run asynchronously without blocking the mutation. This keeps the UI responsive.
The claim extraction stage uses Google’s Gemini AI to identify factual claims from unprocessed transcript chunks.
Batch Processing
The extractor fetches all unprocessed chunks for a debate:
// convex/claimExtraction.ts:99
export const extract = internalAction({
args: { debateId: v.id("debates") },
handler: async (ctx, args) => {
const chunks = await ctx.runQuery(
internal.transcriptChunks.getUnprocessed,
{ debateId: args.debateId },
)
if (chunks.length === 0) return null
// Mark processed BEFORE calling LLM to prevent duplicates
await ctx.runMutation(internal.transcriptChunks.markProcessed, {
chunkIds: chunks.map((c) => c._id),
})
},
})
Chunks are marked as processed before calling the LLM to prevent duplicate extraction if the action is retriggered.
Conversation History
The extractor maintains conversation context across multiple extraction runs using the extractionSessions table:
// convex/claimExtraction.ts:126
const session = await ctx.runQuery(
internal.extractionSessions.getByDebate,
{ debateId: args.debateId },
)
const existingMessages: Message[] = session?.messages ?? []
// Build new user message from chunks
const newUserMessage = chunks
.map((c) => `[${speakerNames[c.speaker]}]: ${c.text}`)
.join("\n")
const messages: Message[] = [
...existingMessages,
{ role: "user", content: newUserMessage },
]
This allows the AI to:
- Resolve pronouns and references from earlier in the debate
- Avoid re-extracting claims from previous turns
- Understand context for ambiguous statements
Streaming Claims
Claims are streamed back from Gemini in JSONL format (one JSON object per line):
// convex/claimExtraction.ts:30
const streamClaims = (
apiKey: string,
systemPrompt: string,
messages: Message[],
onClaim: (claim: ClaimData) => Promise<void>,
) =>
Effect.tryPromise({
try: async () => {
const client = new GoogleGenAI({ apiKey })
const stream = await client.models.generateContentStream({
model: "gemini-2.5-flash",
config: { maxOutputTokens: 4096, systemInstruction: systemPrompt },
contents: messages.map((m) => ({ role: m.role, parts: [{ text: m.content }] })),
})
let buffer = ""
for await (const chunk of stream) {
buffer += chunk.text ?? ""
// Parse complete lines
let newlineIdx: number
while ((newlineIdx = buffer.indexOf("\n")) !== -1) {
const line = buffer.slice(0, newlineIdx).trim()
buffer = buffer.slice(newlineIdx + 1)
const claim = line ? parseClaim(line) : null
if (claim) await onClaim(claim)
}
}
},
}).pipe(Effect.timeout(Duration.seconds(60)))
As each claim arrives, it’s immediately saved to the database and triggers fact-checking:
// convex/claimExtraction.ts:143
streamClaims(apiKey, systemPrompt, messages, async (claim) => {
await ctx.runMutation(internal.claims.saveClaim, {
debateId: args.debateId,
speaker: claim.speaker,
claimText: claim.claimText,
originalTranscriptExcerpt: claim.originalTranscriptExcerpt,
})
})
Stage 3: Fact Checking
When a claim is saved, it automatically triggers fact-checking:
// convex/claims.ts:63
export const saveClaim = internalMutation({
handler: async (ctx, args) => {
const claimId = await ctx.db.insert("claims", {
...args,
status: "pending",
extractedAt: Date.now(),
})
// Schedule fact-check action immediately
await ctx.scheduler.runAfter(0, internal.factCheck.check, { claimId })
},
})
The fact-checker uses Perplexity AI to verify claims against real-time web sources. See Error Handling for details on retry logic and timeouts.
Stage 4: Real-time UI Updates
Convex provides real-time reactivity through subscriptions. When any database record changes, subscribed queries automatically re-run.
Subscribing to Claims
The frontend subscribes to claims for a debate:
const claims = useQuery(api.claims.listByDebate, { debateId })
When a claim’s status changes from "pending" → "checking" → "true", the UI automatically updates without polling.
Status Transitions
Claims progress through these states:
pending - Claim extracted, waiting for fact-check
checking - Fact-check in progress
true | false | mixed | unverifiable - Final verdict
The by_debate_and_status index (schema.ts:51) allows efficient queries for claims in specific states, useful for showing “pending” counts.
Pipeline Orchestration
Manual Triggering
The frontend can manually trigger extraction at any time:
await convex.mutation(api.transcriptChunks.triggerExtraction, { debateId })
On Debate End
When a debate ends, extraction is automatically triggered to catch any remaining chunks:
// convex/debates.ts:52
export const end = mutation({
handler: async (ctx, args) => {
await ctx.db.patch(args.debateId, {
status: "ended",
endedAt: Date.now(),
})
// Final extraction pass
await ctx.scheduler.runAfter(0, internal.claimExtraction.extract, {
debateId: args.debateId,
})
},
})
Idempotency
All pipeline stages are designed to be idempotent:
- Chunks are marked
processedForClaims: true before LLM calls
- Claims are only extracted from new chunks, not re-extracted from history
- Fact-checks update existing claim records rather than creating duplicates
Batching
Claim extraction processes all unprocessed chunks in a single LLM call, which:
- Reduces API calls and costs
- Provides better context for the AI
- Improves claim quality by seeing multiple utterances together
Timeouts
Both extraction and fact-checking have timeouts to prevent hung actions:
- Claim extraction: 60 seconds (claimExtraction.ts:75)
- Fact-checking: 30 seconds (factCheck.ts:90)
See Error Handling for details on retry strategies.