Documentation Index
Fetch the complete documentation index at: https://mintlify.com/ritz541/flower-engine/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Flower Engine currently uses a simple test client for WebSocket functionality testing. There is no formal pytest setup - testing focuses on end-to-end validation of the WebSocket protocol and LLM streaming.
Test Client
The test client (engine/test_client.py) is a WebSocket client that validates the complete message flow.
Test Client Location
What It Tests
The test client verifies:
- Connection handshake - Initial connection to WebSocket endpoint
- State synchronization -
sync_state message reception
- Prompt submission - Sending user messages
- Thinking indicator - Backend processing acknowledgment
- Streaming response -
chat_chunk messages with content
- Stream completion -
chat_end message with token counts
Running the Test Client
Prerequisites
The backend must be running before executing the test:
# Terminal 1: Start the backend
python -m uvicorn engine.main:app --host 0.0.0.0 --port 8000 --reload
Execute Test
# Terminal 2: Run the test client
python engine/test_client.py
Expected Output
Successful test output looks like:
Connected to ws://localhost:8000/ws/rpc
Handshake: {'event': 'system_update', 'payload': {'content': '✓ Engine ready.', 'metadata': {'status': 'ok'}}}
Sync State: {'event': 'sync_state', 'payload': {'content': '', 'metadata': {...}}}
Sent prompt
Thinking: {'event': 'system_update', 'payload': {'content': '...', 'metadata': {}}}
I am an AI assistant created by Anthropic...
[Finished Streams. Total tokens: 245]
Test Client Implementation
From engine/test_client.py:
import asyncio
import websockets
import json
async def test_chat():
uri = "ws://localhost:8000/ws/rpc"
async with websockets.connect(uri) as websocket:
print(f"Connected to {uri}")
# 1. Expect Handshake
handshake = await websocket.recv()
print(f"Handshake: {json.loads(handshake)}")
# 2. Expect Sync State
sync_state = await websocket.recv()
print(f"Sync State: {json.loads(sync_state)}")
# 3. Send Prompt
prompt = {"prompt": "Who are you?"}
await websocket.send(json.dumps(prompt))
print("Sent prompt")
# 4. Expect Thinking
thinking = await websocket.recv()
print(f"Thinking: {json.loads(thinking)}")
# 5. Expect stream and chat_end
full_msg = ""
while True:
chunk = await websocket.recv()
data = json.loads(chunk)
if data["event"] == "chat_end":
print(f"\n[Finished Streams. Total tokens: {data['payload']['metadata'].get('total_tokens')}]")
break
elif data["event"] == "chat_chunk":
content = data["payload"]["content"]
metadata = data["payload"]["metadata"]
full_msg += content
print(content, end="", flush=True)
# optionally print tps
# print(f" (TPS: {metadata.get('tokens_per_second')})", end="", flush=True)
if __name__ == "__main__":
asyncio.run(test_chat())
Message Flow
The test validates this exact sequence:
┌─────────┐ ┌─────────┐
│ Client │ │ Backend │
└────┬────┘ └────┬────┘
│ │
│ ─────────── Connect ──────────────────────> │
│ │
│ <────────── Handshake (system_update) ───── │
│ {"event": "system_update", │
│ "payload": {"content": "✓..."}} │
│ │
│ <────────── Sync State ─────────────────── │
│ {"event": "sync_state", ...} │
│ │
│ ─────────── Prompt ──────────────────────> │
│ {"prompt": "Who are you?"} │
│ │
│ <────────── Thinking ───────────────────── │
│ {"event": "system_update"} │
│ │
│ <────────── chat_chunk ─────────────────── │
│ {"event": "chat_chunk", │
│ "payload": {"content": "I..."}} │
│ │
│ <────────── chat_chunk ─────────────────── │
│ (repeated for each token) │
│ │
│ <────────── chat_end ───────────────────── │
│ {"event": "chat_end", │
│ "payload": {"metadata": │
│ {"total_tokens": 245}}} │
│ │
Creating Custom Tests
Testing Specific Commands
You can modify the test client to test specific commands:
import asyncio
import websockets
import json
async def test_world_selection():
uri = "ws://localhost:8000/ws/rpc"
async with websockets.connect(uri) as websocket:
# Skip handshake messages
await websocket.recv() # handshake
await websocket.recv() # sync_state
# Test world selection
await websocket.send(json.dumps({"prompt": "/world select darkwood"}))
# Expect confirmation
response = await websocket.recv()
data = json.loads(response)
assert data["event"] == "system_update"
assert "darkwood" in data["payload"]["content"]
print(f"✓ World selection test passed")
if __name__ == "__main__":
asyncio.run(test_world_selection())
Testing Error Handling
async def test_error_handling():
uri = "ws://localhost:8000/ws/rpc"
async with websockets.connect(uri) as websocket:
await websocket.recv() # handshake
await websocket.recv() # sync_state
# Send prompt without world/character setup
await websocket.send(json.dumps({"prompt": "Hello"}))
# Expect error message
response = await websocket.recv()
data = json.loads(response)
assert "Prepare the stage" in data["payload"]["content"]
print(f"✓ Error handling test passed")
if __name__ == "__main__":
asyncio.run(test_error_handling())
Testing Stream Cancellation
async def test_cancel_stream():
uri = "ws://localhost:8000/ws/rpc"
async with websockets.connect(uri) as websocket:
await websocket.recv() # handshake
await websocket.recv() # sync_state
# Set up world and character
await websocket.send(json.dumps({"prompt": "/world select darkwood"}))
await websocket.recv()
await websocket.send(json.dumps({"prompt": "/character select ranger"}))
await websocket.recv()
await websocket.send(json.dumps({"prompt": "/session new"}))
await websocket.recv()
# Start generation
await websocket.send(json.dumps({"prompt": "Tell me a long story"}))
await websocket.recv() # thinking
# Wait for first chunk
await websocket.recv() # first chat_chunk
# Cancel
await websocket.send(json.dumps({"prompt": "/cancel"}))
# Expect cancellation confirmation
response = await websocket.recv()
data = json.loads(response)
assert "cancelled" in data["payload"]["content"].lower()
print(f"✓ Stream cancellation test passed")
if __name__ == "__main__":
asyncio.run(test_cancel_stream())
Manual Testing
Testing the Full System
The most comprehensive test is running the full system:
# Start the system
./start.sh
Then perform these manual tests:
-
Initial State
- Verify header shows “Connecting…”
- Verify connection completes and shows “Synced”
-
World Selection
- Type
/ to open commands popup
- Select “world select”
- Choose a world
- Verify header updates with world name
-
Character Selection
- Type
/character select
- Choose a character
- Verify header updates
-
Session Management
- Type
/session new
- Verify session ID appears
- Type
/session continue to test session switching
-
Chat Functionality
- Type a message and press Enter
- Verify spinner appears
- Verify streaming response
- Verify response completes
-
Model Switching
- Type
/model to see available models
- Select a different model
- Verify header updates
- Send a test message
-
Cancellation
- Start a long generation
- Press ESC during generation
- Verify stream stops
-
Rules System
- Type
/rules add and select a rule
- Type
/rules clear to remove rules
Testing with Different LLM Providers
Test each provider configured in config.yaml:
# In the TUI
/model google/gemini-2.0-pro-exp-02-05:free
Hello, test message
/model anthropic/claude-3-haiku
Hello, test message
/model deepseek-chat
Hello, test message
Verify:
- Connection works
- Streaming is smooth
- Token counts are accurate
- Pricing information updates
Debugging Test Failures
Backend Logs
Check backend logs for errors:
# Run backend with verbose logging
python -m uvicorn engine.main:app --log-level debug
WebSocket Connection Issues
If the test client can’t connect:
# Check if backend is running
curl http://localhost:8000/
# Check if port is in use
lsof -i :8000
# Test WebSocket with wscat
npm install -g wscat
wscat -c ws://localhost:8000/ws/rpc
If messages aren’t parsing correctly:
# Add debug output to test client
import json
data = await websocket.recv()
print(f"Raw message: {data}")
parsed = json.loads(data)
print(f"Parsed: {json.dumps(parsed, indent=2)}")
Stream Not Completing
If chat_end never arrives:
- Check LLM API key is valid
- Check network connectivity
- Check for API rate limits
- Review backend logs for exceptions
Measuring Tokens Per Second
Modify the test client to measure performance:
import time
async def test_streaming_performance():
uri = "ws://localhost:8000/ws/rpc"
async with websockets.connect(uri) as websocket:
await websocket.recv() # handshake
await websocket.recv() # sync_state
# Set up session
# ...
# Start generation
start_time = time.time()
await websocket.send(json.dumps({"prompt": "Count to 100"}))
await websocket.recv() # thinking
total_tokens = 0
chunk_count = 0
while True:
chunk = await websocket.recv()
data = json.loads(chunk)
if data["event"] == "chat_end":
total_tokens = data["payload"]["metadata"].get("total_tokens", 0)
break
elif data["event"] == "chat_chunk":
chunk_count += 1
elapsed = time.time() - start_time
tps = total_tokens / elapsed if elapsed > 0 else 0
print(f"Total tokens: {total_tokens}")
print(f"Total time: {elapsed:.2f}s")
print(f"Tokens/second: {tps:.2f}")
print(f"Chunks: {chunk_count}")
if __name__ == "__main__":
asyncio.run(test_streaming_performance())
Load Testing
Test multiple concurrent connections:
import asyncio
import websockets
import json
async def client_task(client_id: int):
uri = "ws://localhost:8000/ws/rpc"
async with websockets.connect(uri) as websocket:
await websocket.recv() # handshake
await websocket.recv() # sync_state
# Set up and send prompt
# ...
print(f"Client {client_id} completed")
async def load_test(num_clients: int):
tasks = [client_task(i) for i in range(num_clients)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(load_test(10)) # Test with 10 concurrent clients
Future Testing Plans
Planned improvements to the testing infrastructure:
- Unit tests with pytest for individual components
- Integration tests for database operations
- Mock LLM responses for faster testing
- Automated CI/CD pipeline with GitHub Actions
- Code coverage reporting
- TUI automated testing with terminal replay
Next Steps