Documentation Index
Fetch the complete documentation index at: https://mintlify.com/timepoint-ai/timepoint-pro/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The GraphStore provides unified storage for entities, timepoints, graphs, dialogs, and all simulation artifacts. Built on SQLModel (Pydantic + SQLAlchemy) with SQLite backend and WAL mode for concurrent access.
Module: storage.py
Database: timepoint.db (SQLite with WAL mode)
Initialization
Signature:
class GraphStore:
def __init__(self, db_url: str = "sqlite:///timepoint.db")
Example:
from storage import GraphStore
store = GraphStore("sqlite:///timepoint.db")
# Or custom location:
store = GraphStore("sqlite:////path/to/my_simulation.db")
Features:
- Automatic table creation from SQLModel schemas
- WAL mode enabled for concurrent reads/writes
- 30-second busy timeout for write locks
Transactions
transaction()
Atomic database operations with automatic commit/rollback.
Signature:
@contextmanager
def transaction(self) -> Generator[TransactionContext, None, None]
Example:
with store.transaction() as tx:
tx.save_entity(entity)
tx.save_timepoint(timepoint)
tx.save_exposure_event(event)
# All succeed or all rollback
TransactionContext Methods:
save_entity(entity: Entity) -> Entity
save_timepoint(timepoint: Timepoint) -> Timepoint
save_exposure_event(event: ExposureEvent) -> ExposureEvent
save_exposure_events(events: list[ExposureEvent])
save_dialog(dialog: Dialog) -> Dialog
save_relationship_trajectory(trajectory: RelationshipTrajectory)
save_timeline(timeline: Timeline) -> Timeline
save_query_history(query_history: QueryHistory)
save_prospective_state(prospective_state: ProspectiveState)
Entity Operations
save_entity()
Save or update an entity.
Signature:
def save_entity(self, entity: Entity) -> Entity
Behavior:
- If
entity_id exists: Updates all fields
- If new: Inserts entity
- Marks
entity_metadata as modified (JSON column)
Example:
from schemas import Entity, ResolutionLevel
entity = Entity(
entity_id="alexander_hamilton",
entity_type="human",
resolution_level=ResolutionLevel.DIALOG,
entity_metadata={
"knowledge_state": ["Founded First Bank"],
"energy_budget": 85.0,
"personality_traits": [0.8, -0.3, 0.6, 0.9, -0.2]
}
)
saved = store.save_entity(entity)
print(f"Saved: {saved.entity_id} (id={saved.id})")
get_entity()
Retrieve entity by ID.
Signature:
def get_entity(
self,
entity_id: str,
timepoint: str | None = None
) -> Entity | None
Example:
entity = store.get_entity("alexander_hamilton")
if entity:
print(f"Resolution: {entity.resolution_level}")
print(f"Centrality: {entity.eigenvector_centrality}")
get_all_entities()
Retrieve all entities.
Signature:
def get_all_entities(self) -> list[Entity]
Example:
entities = store.get_all_entities()
print(f"Total entities: {len(entities)}")
for entity in entities:
print(f"{entity.entity_id}: {entity.resolution_level}")
Exposure Events (M3)
save_exposure_event()
Save a single exposure event.
Signature:
def save_exposure_event(self, event: ExposureEvent) -> ExposureEvent
Example:
from schemas import ExposureEvent
from datetime import datetime
event = ExposureEvent(
entity_id="alexander_hamilton",
event_type="learned",
information="Madison supports bicameral legislature",
source="james_madison",
timestamp=datetime(1787, 5, 25, 14, 0),
confidence=0.9,
timepoint_id="tp_001",
run_id="run_123"
)
saved = store.save_exposure_event(event)
save_exposure_events()
Batch save exposure events.
Signature:
def save_exposure_events(self, exposure_events: list[ExposureEvent])
get_exposure_events()
Get exposure events for an entity.
Signature:
def get_exposure_events(
self,
entity_id: str,
limit: int | None = None
) -> list[ExposureEvent]
Parameters:
entity_id: Entity identifier
limit: Maximum events to return (most recent first)
Example:
events = store.get_exposure_events(
"alexander_hamilton",
limit=10
)
for event in events:
print(f"{event.timestamp}: {event.information}")
print(f" Source: {event.source}, Confidence: {event.confidence}")
get_exposure_events_by_run()
Get all exposure events for a simulation run.
Signature:
def get_exposure_events_by_run(self, run_id: str) -> list[ExposureEvent]
get_entity_knowledge_at_timepoint()
Get what an entity knew at a specific timepoint.
Signature:
def get_entity_knowledge_at_timepoint(
self,
entity_id: str,
timepoint_id: str
) -> list[str]
Example:
knowledge = store.get_entity_knowledge_at_timepoint(
"alexander_hamilton",
"tp_005"
)
print("Hamilton knew at tp_005:")
for item in knowledge:
print(f"- {item}")
Timepoint Operations
save_timepoint()
Save a timepoint.
Signature:
def save_timepoint(self, timepoint: Timepoint) -> Timepoint
Example:
from schemas import Timepoint, ResolutionLevel
from datetime import datetime
timepoint = Timepoint(
timepoint_id="tp_001",
timeline_id="timeline_001",
timestamp=datetime(1787, 5, 25, 9, 0),
event_description="Constitutional Convention begins",
entities_present=["hamilton", "madison", "washington"],
causal_parent=None, # First timepoint
resolution_level=ResolutionLevel.DIALOG,
run_id="run_123"
)
saved = store.save_timepoint(timepoint)
get_timepoint()
Get timepoint by ID (with LRU caching).
Signature:
@lru_cache(maxsize=500)
def get_timepoint(self, timepoint_id: str) -> Timepoint | None
get_all_timepoints()
Get all timepoints ordered by timestamp.
Signature:
def get_all_timepoints(self) -> list[Timepoint]
get_timepoints_by_run()
Get all timepoints for a simulation run.
Signature:
def get_timepoints_by_run(self, run_id: str) -> list[Timepoint]
get_timepoints_in_range()
Get timepoints within a time range.
Signature:
def get_timepoints_in_range(
self,
start_time=None,
end_time=None
) -> list[Timepoint]
Example:
from datetime import datetime
start = datetime(1787, 5, 25)
end = datetime(1787, 9, 17)
timepoints = store.get_timepoints_in_range(start, end)
print(f"Found {len(timepoints)} timepoints in range")
get_successor_timepoints()
Get timepoints that have this timepoint as causal parent.
Signature:
def get_successor_timepoints(self, timepoint_id: str) -> list[Timepoint]
get_predecessor_timepoints()
Get causal parent(s) of a timepoint.
Signature:
def get_predecessor_timepoints(self, timepoint_id: str) -> list[Timepoint]
Graph Operations
save_graph()
Serialize NetworkX graph to database.
Signature:
def save_graph(self, graph: nx.Graph, timepoint_id: str)
Example:
import networkx as nx
graph = nx.Graph()
graph.add_node("hamilton", role="primary")
graph.add_node("madison", role="primary")
graph.add_edge("hamilton", "madison", relationship="ally", weight=0.9)
store.save_graph(graph, "tp_001")
load_graph()
Deserialize NetworkX graph from database.
Signature:
def load_graph(self, timepoint_id: str) -> nx.Graph | None
Example:
graph = store.load_graph("tp_001")
if graph:
print(f"Nodes: {graph.number_of_nodes()}")
print(f"Edges: {graph.number_of_edges()}")
for u, v, data in graph.edges(data=True):
print(f"{u} <-{data['relationship']}-> {v}")
Dialog Storage (M11)
save_dialog()
Save a dialog conversation.
Signature:
def save_dialog(self, dialog: Dialog) -> Dialog
Example:
import json
from schemas import Dialog
from datetime import datetime
turns = [
{
"speaker": "hamilton",
"content": "We need a strong central bank.",
"timestamp": "1791-01-15T10:00:00",
"emotional_tone": "confident",
"knowledge_references": ["banking_expertise"]
},
{
"speaker": "jefferson",
"content": "That concentrates too much power.",
"timestamp": "1791-01-15T10:01:00",
"emotional_tone": "skeptical",
"knowledge_references": ["states_rights"]
}
]
dialog = Dialog(
dialog_id="dialog_001",
timepoint_id="tp_010",
participants=json.dumps(["hamilton", "jefferson"]),
turns=json.dumps(turns),
context_used=json.dumps({"location": "cabinet_room"}),
duration_seconds=120,
information_transfer_count=2,
run_id="run_123"
)
saved = store.save_dialog(dialog)
get_dialog()
Get dialog by ID.
Signature:
def get_dialog(self, dialog_id: str) -> Dialog | None
get_dialogs_at_timepoint()
Get all dialogs at a timepoint.
Signature:
def get_dialogs_at_timepoint(self, timepoint_id: str) -> list[Dialog]
load_all_dialogs()
Load all dialogs (for narrative export).
Signature:
def load_all_dialogs(self) -> list[Dialog]
Relationship Trajectories (M13)
save_relationship_trajectory()
Save relationship evolution over time.
Signature:
def save_relationship_trajectory(
self,
trajectory: RelationshipTrajectory
) -> RelationshipTrajectory
get_relationship_trajectory_between()
Get most recent trajectory between two entities.
Signature:
def get_relationship_trajectory_between(
self,
entity_a: str,
entity_b: str
) -> RelationshipTrajectory | None
get_entity_relationships()
Get all relationships involving an entity.
Signature:
def get_entity_relationships(
self,
entity_id: str
) -> list[RelationshipTrajectory]
Timeline Management (M12)
save_timeline()
Save a timeline (for counterfactual branching).
Signature:
def save_timeline(self, timeline: Timeline) -> Timeline
get_timeline()
Get timeline by ID.
Signature:
def get_timeline(self, timeline_id: str) -> Timeline | None
get_child_timelines()
Get all child timelines of a parent.
Signature:
def get_child_timelines(self, parent_timeline_id: str) -> list[Timeline]
Example:
baseline = store.get_timeline("timeline_baseline")
children = store.get_child_timelines("timeline_baseline")
for child in children:
print(f"Branch: {child.timeline_id}")
print(f" Point: {child.branch_point}")
print(f" Intervention: {child.intervention_description}")
Query History (M5)
save_query_history()
Save query history for resolution tracking.
Signature:
def save_query_history(self, query_history: QueryHistory) -> QueryHistory
get_query_history_for_entity()
Get query history for an entity.
Signature:
def get_query_history_for_entity(
self,
entity_id: str,
limit: int = 100
) -> list[QueryHistory]
get_entity_query_count()
Get total queries for an entity.
Signature:
def get_entity_query_count(self, entity_id: str) -> int
get_entity_elevation_count()
Get number of resolution elevations.
Signature:
def get_entity_elevation_count(self, entity_id: str) -> int
Prospective State (M15)
save_prospective_state()
Save entity’s expectations and forecasts.
Signature:
def save_prospective_state(self, prospective_state) -> ProspectiveState
get_prospective_states_for_entity()
Get all prospective states for an entity.
Signature:
def get_prospective_states_for_entity(
self,
entity_id: str
) -> list[ProspectiveState]
Convergence Evaluation
save_convergence_set()
Save convergence analysis results.
Signature:
def save_convergence_set(
self,
convergence_set: ConvergenceSet
) -> ConvergenceSet
get_convergence_sets()
Get convergence sets with filtering.
Signature:
def get_convergence_sets(
self,
template_id: str | None = None,
min_score: float | None = None,
limit: int = 100
) -> list[ConvergenceSet]
get_convergence_stats()
Get aggregate convergence statistics.
Signature:
def get_convergence_stats(self) -> dict
Returns: Dictionary with:
total_sets: Number of convergence sets
average_score: Mean convergence score
min_score: Minimum score
max_score: Maximum score
grade_distribution: Count by grade (A/B/C/D/F)
template_coverage: Count by template
SQLite Schema
Tables:
entity: Entities with resolution levels and metadata
timepoint: Temporal events with causal chains
exposureevent: Knowledge exposure tracking
dialog: Dialog conversations
relationshiptrajectory: Relationship evolution
timeline: Timeline branching (counterfactuals)
queryhistory: Query pattern tracking
prospectivestate: Entity expectations
convergenceset: Cross-run causal analysis
systemprompt: Prompt templates
validationrule: Validation configurations
environmententity: Scene environments
atmosphereentity: Scene atmosphere
crowdentity: Crowd dynamics
Indexes:
entity_id (unique on entity)
timepoint_id (unique on timepoint)
timeline_id (on timepoint)
causal_parent (on timepoint)
run_id (on timepoint, exposureevent, dialog)
WAL Mode Benefits:
- Multiple readers + one writer simultaneously
- Faster writes (no lock blocking)
- Better concurrency
LRU Caching:
get_timepoint() cached (500 entries)
- Speeds up temporal traversal
Batch Operations:
save_exposure_events() for bulk inserts
save_dialogs() for batch dialog storage
- Transaction contexts for atomic multi-table writes
Best Practices
- Use transactions for multi-table operations
- Batch exposure events instead of one-by-one
- Set run_id for convergence analysis
- Check entity existence before updates
- Use LRU cache by calling get_timepoint()
- Clean old data periodically with _clear_database()
- Monitor database size (WAL can grow)
Example Workflow
from storage import GraphStore
from schemas import Entity, Timepoint, ExposureEvent, ResolutionLevel
from datetime import datetime
import uuid
store = GraphStore()
run_id = str(uuid.uuid4())
# Atomic multi-table write
with store.transaction() as tx:
# Save entity
entity = Entity(
entity_id="hamilton",
entity_type="human",
resolution_level=ResolutionLevel.DIALOG
)
tx.save_entity(entity)
# Save timepoint
timepoint = Timepoint(
timepoint_id="tp_001",
timestamp=datetime.now(),
event_description="Scene begins",
entities_present=["hamilton"],
run_id=run_id
)
tx.save_timepoint(timepoint)
# Save exposure event
event = ExposureEvent(
entity_id="hamilton",
event_type="witnessed",
information="Convention begins",
source="scene",
timestamp=datetime.now(),
timepoint_id="tp_001",
run_id=run_id
)
tx.save_exposure_event(event)
print("Transaction committed successfully")