Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/8BitTacoSupreme/flowstate/llms.txt

Use this file to discover all available pages before exploring further.

FlowState accumulates knowledge across pipeline runs in a local SQLite database called memory.db. Research findings, strategy assessments, interview decisions, and failure logs are all stored as searchable entries. Before each bridge call, the adapter queries the store and injects any relevant prior knowledge directly into the prompt — so the more times you run the pipeline on a project, the richer the context Claude receives.

What Gets Stored

The memory system automatically captures five categories of information, distinguished by a MemoryKind enum:
# flowstate/memory.py
class MemoryKind(StrEnum):
    RESEARCH  = "research"   # findings from the Research step
    STRATEGY  = "strategy"   # assessments from the Strategy step
    DECISION  = "decision"   # interview answers and GSD planning outputs
    TOOL_RUN  = "tool_run"   # execution logs, including failures
    INSIGHT   = "insight"    # catch-all for other adapter outputs

research

Findings from every research topic. Each ## heading in research/report.md becomes a separate, independently searchable entry.

strategy

The pressure-test assessment from research/strategy.md, also split by headings for fine-grained retrieval.

decision

Interview answers stored at pipeline start, plus GSD planning outputs. Captures the why behind architectural choices.

tool_run

Failure logs from any blocked step. Stored so future runs can reference what went wrong and why.

insight

Catch-all kind for adapter outputs that don’t map to a specific tool. Available for custom integrations that extend the pipeline.

How Memories Are Created

The orchestrator wires together a MemoryStore and an EventBus at the start of every pipeline run. Memory handlers listen for domain events and store artifacts automatically — no adapter has to call the store directly after a successful step.

Step completion → artifact splitting

When a step succeeds, the orchestrator emits a StepCompleted event:
# flowstate/orchestrator.py
bus.emit(
    StepCompleted(
        payload={"tool": tool_name, "artifacts": result.artifacts},
        source="orchestrator",
    )
)
The on_step_completed handler reads each artifact file and splits its content on ## Markdown headings. Each section becomes an individual MemoryEntry:
# flowstate/memory_handlers.py
def _split_sections(text: str) -> list[tuple[str, str]]:
    """Split markdown by ## headings into (heading, body) tuples."""
    parts = re.split(r"^(## .+)$", text, flags=re.MULTILINE)
    sections = []
    if parts[0].strip():
        sections.append(("Overview", parts[0].strip()))
    for i in range(1, len(parts), 2):
        heading = parts[i].lstrip("# ").strip()
        body = parts[i + 1].strip() if i + 1 < len(parts) else ""
        if body:
            sections.append((heading, body))
    return sections
The tool-to-kind mapping determines how the entries are classified:
# flowstate/memory_handlers.py
TOOL_TO_KIND = {
    "research":   MemoryKind.RESEARCH,
    "strategy":   MemoryKind.STRATEGY,
    "gsd":        MemoryKind.DECISION,
    "discipline": MemoryKind.TOOL_RUN,
}

Step failure → error log

When a step fails, a StepFailed event stores the error as a tool_run memory so it’s available as context on the next attempt:
# flowstate/memory_handlers.py
@handler("step.failed", priority=EventPriority.AUDIT)
def on_step_failed(event: Event) -> None:
    tool_name = event.payload.get("tool", "")
    error = event.payload.get("error", "unknown error")
    store.add(
        MemoryEntry.create(
            MemoryKind.TOOL_RUN,
            f"Tool '{tool_name}' failed: {error}",
            f"{tool_name} failure",
            source=tool_name,
            tags=[tool_name, "failure"],
            run_id=run_id,
        )
    )

Interview answers → decision memory

Before the pipeline even begins its five steps, the orchestrator stores the interview answers as a decision memory. This means future strategy and research calls can retrieve the project’s core problem and architectural decisions as prior context:
# flowstate/orchestrator.py
memory.add(
    MemoryEntry.create(
        MemoryKind.DECISION,
        (
            f"Core problem: {answers.core_problem}\n"
            f"10x vision: {answers.ten_x_vision}\n"
            f"Architecture: {answers.architecture_pattern}\n"
            f"Research focus: {answers.research_focus}"
        ),
        "Interview answers",
        source="interview",
        tags=["interview", "decision"],
        run_id=run_id,
    )
)

Memory Injection Into Prompts

Before each bridge call, the adapter calls memory.get_context(topic). If relevant memories exist, the result is a ## Prior Knowledge Markdown section that gets prepended to the prompt:
# flowstate/memory.py
def get_context(self, query: str, *, max_tokens: int = 2000) -> str:
    results = self.search(query, limit=10)
    if not results:
        return ""

    char_budget = max_tokens * 4
    lines = ["## Prior Knowledge\n"]
    used = len(lines[0])

    for sr in results:
        entry = sr.entry
        header = f"### {entry.summary} ({entry.kind.value})\n"
        body = entry.content.strip()
        block = header + body + "\n\n"

        if used + len(block) > char_budget:
            remaining = char_budget - used - len(header) - 10
            if remaining > 100:
                lines.append(header + body[:remaining] + "...\n\n")
            break

        lines.append(block)
        used += len(block)

    return "".join(lines)
Results are ranked by BM25 relevance and truncated to a token budget (default 2,000 tokens, approximated as 4 characters per token) before injection. The section is empty if no relevant memories are found, so prompts are never padded with irrelevant context.

MemoryStore API

All memory operations go through MemoryStore. It is a context manager, but the orchestrator also calls memory.close() explicitly at the end of the pipeline.
from flowstate.memory import MemoryStore, MemoryEntry, MemoryKind

# Open / create the store (creates memory.db in the project root)
store = MemoryStore(root=Path("/my/project"))

# or use as a context manager
with MemoryStore(root=Path("/my/project")) as store:
    ...

Adding entries

# Add a single entry
entry = MemoryEntry.create(
    MemoryKind.RESEARCH,
    content="Kafka Streams provides stateful stream processing with RocksDB-backed stores.",
    summary="Kafka Streams: stateful processing",
    source="research/report.md",
    tags=["kafka", "streams", "stateful"],
    run_id="abc123",
)
memory_id: str = store.add(entry)

# Add multiple entries atomically
ids: list[str] = store.add_many([entry1, entry2, entry3])

Searching

# Full-text BM25 search across all kinds
results: list[SearchResult] = store.search("kafka streams", limit=10)

# Filter by kind
results = store.search("failure", kind=MemoryKind.TOOL_RUN, limit=5)

for sr in results:
    print(sr.score, sr.entry.summary, sr.entry.content[:200])

Fetching by ID or kind

# Fetch a single entry by its ID (returns None if not found)
entry: MemoryEntry | None = store.get("abc123def456")

# Fetch the most recent entries of a given kind
entries: list[MemoryEntry] = store.get_by_kind(MemoryKind.RESEARCH, limit=20)

Context retrieval for prompt injection

# Returns a formatted "## Prior Knowledge" markdown section
context: str = store.get_context("kafka streams architecture", max_tokens=2000)

# Prepend to prompt if non-empty
if context:
    prompt = context + "\n\n" + prompt

Counting and clearing

total: int = store.count()
research_count: int = store.count(kind=MemoryKind.RESEARCH)

cleared: int = store.clear()  # returns number of deleted entries

Storage and Schema

memory.db is a single SQLite file created in the project root the first time MemoryStore is instantiated. It is gitignored by default and fully portable — copy it to another machine and searches work immediately.

FTS5 with Porter Stemming

The full-text index uses SQLite’s FTS5 extension with the porter unicode61 tokenizer:
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
    summary,
    content,
    tags,
    content=memories,
    content_rowid=rowid,
    tokenize='porter unicode61'
);
Porter stemming means morphological variants match automatically: searching for "streaming" also matches entries that contain "streams" or "streamed". Search ranking uses BM25, SQLite’s built-in rank column on FTS5 queries. Triggers keep the FTS index in sync with the memories table automatically — inserts, updates, and deletes are all reflected immediately.

Inspecting the database directly

sqlite3 memory.db

-- Show all memory kinds and counts
SELECT kind, COUNT(*) as n FROM memories GROUP BY kind;

-- Full-text search
SELECT summary, kind, created_at
FROM memories_fts
JOIN memories ON memories.rowid = memories_fts.rowid
WHERE memories_fts MATCH 'kafka streams'
ORDER BY rank
LIMIT 10;
The sqlite-vec Python package is listed as a dependency to enable future vector/embedding search. In v1, it is installed but dormant — the memories_fts FTS5 table handles all search. Vector search will be opt-in in a future release.

CLI Commands

FlowState exposes memory operations as first-class CLI commands:
# Search stored knowledge with BM25 ranking
flowstate memory search "kafka streams"

# Show counts grouped by kind
flowstate memory stats

# Clear all memories (prompts for confirmation)
flowstate memory clear
flowstate memory clear is irreversible. All stored research, strategy, and decision memories are deleted from memory.db. The file itself remains on disk.

Complete Example

from pathlib import Path
from flowstate.memory import MemoryStore, MemoryEntry, MemoryKind

with MemoryStore(root=Path.cwd()) as store:
    # Store a research finding
    entry = MemoryEntry.create(
        MemoryKind.RESEARCH,
        content=(
            "Apache Kafka's log-structured storage provides O(1) writes regardless "
            "of data size. Consumers maintain offsets independently, enabling replay."
        ),
        summary="Kafka: log structure and consumer offsets",
        source="research/report.md",
        tags=["kafka", "storage", "offsets"],
        run_id="run_abc123",
    )
    store.add(entry)

    # Later: retrieve context for a new prompt
    context = store.get_context("kafka consumer design")
    # Returns a "## Prior Knowledge" markdown section if relevant entries exist

    print(f"Total memories: {store.count()}")
    print(f"Research entries: {store.count(kind=MemoryKind.RESEARCH)}")

    # Search for specific topics
    results = store.search("consumer offsets replay")
    for sr in results:
        print(f"[{sr.score:.2f}] {sr.entry.summary}")

Build docs developers (and LLMs) love