Documentation Index
Fetch the complete documentation index at: https://mintlify.com/ritz541/flower-engine/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The Flower Engine backend is built with FastAPI and handles:
- WebSocket connections and message routing
- LLM integration and streaming responses
- SQLite database management
- RAG (Retrieval-Augmented Generation) via ChromaDB
- Session and state persistence
- Asset loading from YAML files
Technology Stack
Core dependencies from requirements.txt:
fastapi>=0.100.0
uvicorn>=0.23.0
websockets>=11.0.3
chromadb>=0.4.10
openai>=1.3.0
pydantic>=2.3.0
sentence-transformers>=2.2.2
pyyaml>=6.0.1
watchdog>=3.0.0
google-genai>=0.1.0
Key Libraries
- FastAPI: Async web framework
- Uvicorn: ASGI server
- Pydantic: Data validation and models
- ChromaDB: Vector database for RAG
- OpenAI SDK: LLM API client (supports OpenRouter, DeepSeek, Groq)
- SentenceTransformers: Local embeddings (
all-MiniLM-L6-v2)
- Google GenAI: Official Gemini SDK
Running the Backend
Development Mode (with auto-reload)
python -m uvicorn engine.main:app --host 0.0.0.0 --port 8000 --reload
The --reload flag automatically restarts the server when code changes are detected.
Production Mode
python -m uvicorn engine.main:app --host 0.0.0.0 --port 8000
Direct Execution
venv/bin/python engine/main.py
The backend is configured to auto-reload in engine/main.py:266:
if __name__ == "__main__":
import uvicorn
uvicorn.run("engine.main:app", host="0.0.0.0", port=8000, reload=True)
Code Style Guidelines
Imports
- Standard library first, then third-party, then local
- Use explicit relative imports:
from engine.logger import log
- Group imports logically within files
Example:
import json
import asyncio
from typing import Optional
from fastapi import WebSocket
from pydantic import BaseModel
from engine.logger import log
from engine.database import world_manager
from engine.utils import build_ws_payload
- 4 spaces indentation (not tabs)
- Maximum line length: 120 characters
- Use f-strings for string formatting
- Use trailing commas in multi-line structures
Example:
data = {
"event": "system_update",
"payload": {
"content": f"Model set to {model_name}",
"metadata": {"model": model_name},
}, # Trailing comma
}
Type Hints
- Use Pydantic
BaseModel for all data models
- Use type hints on all function signatures
- Use
Optional[T] for nullable types
- Prefer explicit types over
Any
Example:
from typing import Optional, List
from pydantic import BaseModel
class Character(BaseModel):
id: str
name: str
persona: str
def get_character(char_id: str) -> Optional[Character]:
"""Retrieve character by ID."""
# Implementation
pass
async def process_messages(
websocket: WebSocket,
messages: List[str],
timeout: Optional[float] = None
) -> None:
"""Process a list of messages."""
pass
Naming Conventions
- Modules:
snake_case.py
- Classes:
PascalCase
- Functions/variables:
snake_case
- Constants:
UPPER_SNAKE_CASE
- Private members: prefix with underscore
Example:
# Constants
MAX_RETRIES = 3
DEFAULT_TIMEOUT = 30.0
# Classes
class WorldManager:
def __init__(self):
self._worlds = {} # Private
def add_world(self, world: World) -> None:
"""Add a world to the manager."""
self._worlds[world.id] = world
# Functions
def build_ws_payload(event: str, content: str, metadata: dict = None) -> str:
"""Build WebSocket message payload."""
pass
Error Handling
- Use try/except with specific exception types
- Log errors via
from engine.logger import log
- Use bare
except: only when necessary (never in new code)
- Propagate errors appropriately or return sensible defaults
Example:
from engine.logger import log
try:
world = world_manager.get_world(world_id)
if not world:
raise ValueError(f"World not found: {world_id}")
except ValueError as e:
log.error(f"Invalid world ID: {e}")
await websocket.send_text(
build_ws_payload("error", str(e))
)
return
except Exception as e:
log.error(f"Unexpected error: {e}")
raise
Async Patterns
- Use
async def for all WebSocket handlers
- Use
asyncio for concurrent operations
- Use
asyncio.create_task() for fire-and-forget background tasks
- Handle
asyncio.CancelledError explicitly
Example from engine/main.py:222:
# Stream response in background task
task = asyncio.create_task(
stream_chat_response(
websocket,
prompt,
full_context,
state.ACTIVE_WORLD_ID,
state.ACTIVE_CHARACTER_ID,
state.ACTIVE_SESSION_ID,
)
)
# Monitor for cancellation
while not task.done():
try:
raw = await asyncio.wait_for(websocket.receive_text(), timeout=0.05)
try:
cmd_msg = json.loads(raw)
if cmd_msg.get("prompt") == "/cancel":
task.cancel()
await websocket.send_text(
build_ws_payload("system_update", "✗ Stream cancelled by user.")
)
except:
pass
except asyncio.TimeoutError:
continue
try:
await task
except asyncio.CancelledError:
log.info("Task was successfully cancelled.")
except Exception as e:
log.error(f"Task failed with error: {e}")
Key Components
Main Application (engine/main.py)
The entry point defines the FastAPI app and WebSocket endpoint:
app = FastAPI(title="The Flower Engine")
@app.on_event("startup")
async def startup():
# Load assets
for data in load_yaml_assets("assets/worlds/*.yaml"):
w = World(**data)
world_manager.add_world(w)
# Fetch available models from APIs
# ...
@app.websocket("/ws/rpc")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
await websocket.send_text(
build_ws_payload("system_update", "✓ Engine ready.", {"status": "ok"})
)
await broadcast_sync_state(websocket)
try:
while True:
data = await websocket.receive_text()
# Process message
except WebSocketDisconnect:
log.info("Disconnected.")
Database (engine/database.py)
Uses SQLite with Pydantic models:
import sqlite3
from pydantic import BaseModel
from typing import Optional
class World(BaseModel):
id: str
name: str
lore: str
start_message: str = ""
scene: str = ""
system_prompt: str = ""
DB_NAME = "engine.db"
def init_db():
with sqlite3.connect(DB_NAME) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS worlds (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
lore TEXT NOT NULL,
start_message TEXT NOT NULL DEFAULT '',
scene TEXT NOT NULL DEFAULT '',
system_prompt TEXT NOT NULL DEFAULT ''
)
""")
# Migration: Add new columns
try:
cursor.execute(
"ALTER TABLE worlds ADD COLUMN scene TEXT NOT NULL DEFAULT ''"
)
except sqlite3.OperationalError:
pass # Column already exists
Key Pattern: Use context managers for connections:
with sqlite3.connect(DB_NAME) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM worlds WHERE id = ?", (world_id,))
row = cursor.fetchone()
Include migration logic for schema changes using try/except with sqlite3.OperationalError.
Commands (engine/commands.py)
Command handlers parse slash commands:
async def handle_command(cmd_str: str, websocket: WebSocket):
parts = cmd_str.split(" ", 2)
cmd = parts[0]
if cmd == "/model" and len(parts) >= 2:
new_model = parts[1]
if any(m["id"] == new_model for m in state.AVAILABLE_MODELS):
state.CURRENT_MODEL = new_model
state.MODEL_CONFIRMED = True
state.save_state()
await websocket.send_text(
build_ws_payload(
"system_update",
f"Hot-swapped model to {state.CURRENT_MODEL}",
{"model": state.CURRENT_MODEL, "model_confirmed": True},
)
)
else:
await websocket.send_text(
build_ws_payload(
"system_update", f"Error: {new_model} is not recognized."
)
)
Pattern: Split command string with parts = cmd_str.split(" ", 2) to get command, subcommand, and arguments.
LLM Streaming (engine/llm.py)
Streams responses from various LLM providers:
from openai import AsyncOpenAI
from engine.config import OPENAI_BASE_URL, OPENAI_API_KEY
client = AsyncOpenAI(
base_url=OPENAI_BASE_URL,
api_key=OPENAI_API_KEY,
default_headers={
"HTTP-Referer": "https://github.com/ritz541/flower-engine",
"X-Title": "The Flower Roleplay Engine",
},
)
async def stream_chat_response(
ws: WebSocket,
prompt: str,
context: str,
world_id: str,
char_id: str,
session_id: str = "",
):
# Build messages
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
]
# Stream response
stream = await client.chat.completions.create(
model=state.CURRENT_MODEL,
messages=messages,
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
await ws.send_text(
build_ws_payload(
"chat_chunk",
content,
{"tokens_per_second": tps},
)
)
await ws.send_text(
build_ws_payload(
"chat_end",
"",
{"total_tokens": total_tokens},
)
)
RAG (engine/rag.py)
Retrieval-Augmented Generation using ChromaDB:
import chromadb
from sentence_transformers import SentenceTransformer
class RAGManager:
def __init__(self):
self.client = chromadb.Client()
self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
def add_lore(self, world_id: str, doc_id: str, text: str):
"""Add lore chunk to vector database."""
collection = self.client.get_or_create_collection(f"lore_{world_id}")
embedding = self.embedder.encode(text).tolist()
collection.add(
documents=[text],
embeddings=[embedding],
ids=[doc_id],
)
def query_lore(self, world_id: str, query: str, n_results: int = 2):
"""Query relevant lore chunks."""
collection = self.client.get_collection(f"lore_{world_id}")
query_embedding = self.embedder.encode(query).tolist()
results = collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
)
return results["documents"][0], results["distances"][0]
Utilities (engine/utils.py)
Helper functions for common tasks:
import json
import yaml
import glob
def load_yaml_assets(pattern: str) -> list:
"""Load all YAML files matching pattern."""
assets = []
for path in glob.glob(pattern):
with open(path, 'r') as f:
data = yaml.safe_load(f)
assets.append(data)
return assets
def build_ws_payload(event: str, content: str, metadata: dict = None) -> str:
"""Build WebSocket message payload."""
return json.dumps({
"event": event,
"payload": {
"content": content,
"metadata": metadata or {},
},
})
Development Tips
Hot Reloading
Always use --reload during development:
python -m uvicorn engine.main:app --reload
Uvicorn watches for file changes and automatically restarts the server.
Logging
Use the centralized logger from engine/logger.py:
from engine.logger import log
log.info("Server started")
log.error(f"Failed to load world: {world_id}")
log.debug(f"Received message: {data}")
Testing WebSocket Connections
See the Testing Guide for using engine/test_client.py to test WebSocket functionality.
Debugging
Add print statements or use the logger:
log.debug(f"Current state: {state.ACTIVE_WORLD_ID}")
log.debug(f"Message payload: {json.dumps(msg, indent=2)}")
State Management
Global state is managed in engine/state.py:
import engine.state as state
# Access state
world_id = state.ACTIVE_WORLD_ID
model = state.CURRENT_MODEL
# Modify state
state.ACTIVE_WORLD_ID = "darkwood"
state.save_state() # Persist to persist.json
Common Patterns
Sending WebSocket Messages
from engine.utils import build_ws_payload
# System update
await websocket.send_text(
build_ws_payload("system_update", "World loaded successfully")
)
# With metadata
await websocket.send_text(
build_ws_payload(
"system_update",
"Model changed",
{"model": "gpt-4", "model_confirmed": True},
)
)
Database Queries
from engine.database import world_manager, char_manager, msg_manager
# Get world
world = world_manager.get_world("darkwood")
# Add character
char = Character(id="ranger", name="Elara", persona="A skilled tracker")
char_manager.add_character(char)
# Query messages
recent_msgs = msg_manager.get_messages(
char_id="ranger",
session_id="session_123",
limit=10
)
Asset Loading
from engine.utils import load_yaml_assets
# Load all worlds
for data in load_yaml_assets("assets/worlds/*.yaml"):
world = World(**data)
world_manager.add_world(world)
# Load all characters
for data in load_yaml_assets("assets/characters/*.yaml"):
character = Character(**data)
char_manager.add_character(character)
Next Steps