Documentation Index
Fetch the complete documentation index at: https://mintlify.com/mutuiris/voicepact/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The VoicePact ETL (Extract, Transform, Load) pipeline is implemented in Python using pandas for data transformation and SQLAlchemy for database operations. The pipeline extracts contract data from the operational database, transforms it into analytics-ready summaries, and loads it into a separate SQLite analytics database.
Architecture
Pipeline Location
The ETL pipeline is located at:
server/etl/run_analytics.py
Database Configuration
- Source Database: The operational VoicePact database (configured via
DATABASE_URL)
- Destination Database:
server/etl/analytics.db (created automatically)
Prerequisites
Required Dependencies
The ETL pipeline requires the following Python packages:
pandas>=2.0.0
sqlalchemy>=2.0.0
These should already be installed if you’ve set up the VoicePact server.
Source Data Requirements
The pipeline expects the following tables to exist in the source database:
contracts: Contract records with terms, amounts, and status
contract_parties: Party information (buyers, sellers, mediators)
payments: Payment transactions and escrow releases
Running the ETL Pipeline
Basic Execution
From the server directory, run:
cd server
python etl/run_analytics.py
Expected Output
You should see log output similar to:
2026-03-06 10:30:15 - [INFO] - Starting VoicePact Analytics ETL pipeline...
2026-03-06 10:30:15 - [INFO] - Connecting to SOURCE database: sqlite+aiosqlite:///./voicepact.db
2026-03-06 10:30:15 - [INFO] - EXTRACT: Running query for table 'contracts'...
2026-03-06 10:30:15 - [INFO] - EXTRACT: Read 142 rows from 'contracts'.
2026-03-06 10:30:15 - [INFO] - EXTRACT: Running query for table 'parties'...
2026-03-06 10:30:15 - [INFO] - EXTRACT: Read 284 rows from 'parties'.
2026-03-06 10:30:15 - [INFO] - EXTRACT: Running query for table 'payments'...
2026-03-06 10:30:15 - [INFO] - EXTRACT: Read 89 rows from 'payments'.
2026-03-06 10:30:15 - [INFO] - TRANSFORM: Starting data transformation...
2026-03-06 10:30:15 - [INFO] - TRANSFORM: Processing 'contracts' data...
2026-03-06 10:30:15 - [INFO] - TRANSFORM: Pivoting 'parties' data...
2026-03-06 10:30:15 - [INFO] - TRANSFORM: Aggregating 'payments' data...
2026-03-06 10:30:15 - [INFO] - TRANSFORM: Merging all data sources...
2026-03-06 10:30:15 - [INFO] - TRANSFORM: Transformation complete. 142 rows processed.
2026-03-06 10:30:15 - [INFO] - Connecting to DESTINATION database: sqlite:///server/etl/analytics.db
2026-03-06 10:30:16 - [INFO] - LOAD: Successfully loaded 142 rows into 'fct_contracts_summary' in server/etl/analytics.db
2026-03-06 10:30:16 - [INFO] - VoicePact Analytics ETL pipeline finished successfully.
Scheduling
For production use, schedule the ETL pipeline to run periodically using cron:
# Run daily at 2 AM
0 2 * * * cd /path/to/voicepact/server && /path/to/python etl/run_analytics.py >> /var/log/voicepact-etl.log 2>&1
Pipeline Implementation
The extract stage queries three tables from the source database:
queries = {
"contracts": "SELECT * FROM contracts",
"parties": "SELECT * FROM contract_parties",
"payments": "SELECT * FROM payments",
}
Key implementation details from run_analytics.py:42-87:
- Uses async SQLAlchemy engine for database connections
- Converts query results to pandas DataFrames
- Handles empty tables gracefully with warnings
- Includes comprehensive error handling for database issues
The transform stage performs several data operations:
Date Conversions
df_contracts["created_at"] = pd.to_datetime(df_contracts["created_at"])
df_contracts["completed_at"] = pd.to_datetime(df_contracts["completed_at"])
Feature Engineering
Calculates time to completion:
df_contracts["time_to_completion_days"] = (
df_contracts["completed_at"] - df_contracts["created_at"]
).dt.days
Extracts product from JSON terms field:
def get_product(terms):
if isinstance(terms, str):
try:
terms = json.loads(terms)
except json.JSONDecodeError:
return None
return terms.get("product") if isinstance(terms, dict) else None
df_contracts["product"] = df_contracts["terms"].apply(get_product)
Party Data Pivoting
Transforms party data from long format (multiple rows per contract) to wide format (one row per contract):
# Separate buyers and sellers
df_buyer = (
df_parties[df_parties["role"] == "buyer"]
.loc[:, ["contract_id", "phone_number"]]
.rename(columns={"phone_number": "buyer_phone"})
)
df_seller = (
df_parties[df_parties["role"] == "seller"]
.loc[:, ["contract_id", "phone_number"]]
.rename(columns={"phone_number": "seller_phone"})
)
Payment Aggregation
Sums released payments per contract:
df_payments["amount"] = pd.to_numeric(df_payments["amount"], errors="coerce").fillna(0)
df_payments_agg = (
df_payments[df_payments["status"] == "released"]
.groupby("contract_id")["amount"]
.sum()
.reset_index()
.rename(columns={"amount": "total_paid_released"})
)
Data Merging
Joins all data sources using left joins:
df_summary = pd.merge(df_contracts, df_buyer, on="contract_id", how="left")
df_summary = pd.merge(df_summary, df_seller, on="contract_id", how="left")
df_summary = pd.merge(df_summary, df_payments_agg, on="contract_id", how="left")
Column Selection and Cleanup
Selects and renames columns for the final analytics table:
final_columns = {
"id": "contract_id",
"contract_type": "contract_type",
"status": "contract_status",
"total_amount": "contract_total_amount",
"currency": "currency",
"product": "product",
"created_at": "created_at",
"completed_at": "completed_at",
"time_to_completion_days": "time_to_completion_days",
"buyer_phone": "buyer_phone",
"seller_phone": "seller_phone",
"total_paid_released": "total_paid_released",
}
df_final = df_summary.rename(columns=final_columns)
df_final = df_final[final_columns.values()]
See implementation at run_analytics.py:91-189.
Stage 3: Load
The load stage writes the transformed DataFrame to the analytics database:
df.to_sql(
"fct_contracts_summary",
engine,
if_exists="replace",
index=False,
chunksize=1000,
)
Key implementation details from run_analytics.py:192-220:
- Uses
if_exists="replace" to fully refresh the table on each run
- Processes data in chunks of 1000 rows for memory efficiency
- Creates the analytics database file if it doesn’t exist
- Includes error handling for database connection issues
Accessing the Analytics Database
Using SQLite CLI
sqlite3 server/etl/analytics.db
Once connected, you can run queries:
-- View table schema
.schema fct_contracts_summary
-- Count total rows
SELECT COUNT(*) FROM fct_contracts_summary;
-- Sample data
SELECT * FROM fct_contracts_summary LIMIT 5;
Using Python
import pandas as pd
import sqlite3
# Connect to analytics database
conn = sqlite3.connect('server/etl/analytics.db')
# Run query
df = pd.read_sql_query(
"SELECT contract_type, COUNT(*) as count FROM fct_contracts_summary GROUP BY contract_type",
conn
)
print(df)
conn.close()
Connect business intelligence tools like:
- Metabase: Use SQLite connector with path
server/etl/analytics.db
- Apache Superset: Configure SQLite database connection
- Tableau: Connect via SQLite ODBC driver
- Power BI: Use SQLite connector
Monitoring and Troubleshooting
Common Issues
Source Database Not Found
Database error during EXTRACT: unable to open database file
Solution: Ensure the VoicePact application database exists at ./voicepact.db or update the DATABASE_URL environment variable.
Empty Tables
No data found for table 'contracts'.
Solution: This is a warning, not an error. The pipeline will continue but produce an empty analytics table. Add contracts to the system first.
ETL pipeline failed during TRANSFORM step.
Solution: Check that the source tables have the expected schema. The pipeline expects specific column names and data types.
- Execution Time: For 10,000 contracts, expect ~5-10 seconds
- Memory Usage: The pipeline loads all data into memory; for very large datasets (>100k contracts), consider implementing incremental loads
- Database Locking: The source database uses WAL mode to allow concurrent reads during ETL
Extending the Pipeline
Adding New Metrics
To add calculated fields to the analytics table:
- Add the calculation in the
transform_data() function
- Include the new column in the
final_columns dictionary
- Run the pipeline to refresh the analytics table
Example - adding a “high value” flag:
# In transform_data() function, after other transformations:
df_summary["is_high_value"] = df_summary["total_amount"] > 50000
# Add to final_columns:
final_columns["is_high_value"] = "is_high_value"
Adding New Source Tables
To incorporate additional source data:
- Add a query to the
queries dictionary in extract_data()
- Process the new DataFrame in
transform_data()
- Merge it with the summary DataFrame using appropriate join keys
Incremental Loading
For large production systems, consider implementing incremental updates:
# Track last ETL run time
last_run = get_last_run_timestamp()
# Modify extract queries
queries = {
"contracts": f"SELECT * FROM contracts WHERE created_at > '{last_run}'",
# ...
}
# Use if_exists='append' instead of 'replace'
df.to_sql(
"fct_contracts_summary",
engine,
if_exists="append",
index=False,
)
Best Practices
- Schedule Regularly: Run the ETL pipeline at off-peak hours (e.g., 2 AM daily)
- Monitor Logs: Capture pipeline output to log files for troubleshooting
- Version Control: Track changes to transformation logic in git
- Test Transformations: Validate data quality after pipeline runs
- Backup Analytics DB: Periodically backup the analytics database for historical analysis
- Document Changes: Update this documentation when modifying the pipeline
Next Steps