Documentation Index Fetch the complete documentation index at: https://mintlify.com/nicolasleiva/LatentGEO/llms.txt
Use this file to discover all available pages before exploring further.
Overview
LatentGEO uses Celery for asynchronous task processing:
Audit pipeline : Long-running SEO/GEO analysis
PDF generation : Heavy report rendering
GEO tools : Keywords, rankings, backlinks, LLM visibility
PageSpeed analysis : Performance metrics
Article generation : Batch content creation
Architecture
FastAPI → Celery Task Queue → Worker Processes → Database/Storage
↓
Redis Broker
Redis acts as both the message broker and result backend.
Celery Configuration
Celery App
File : backend/app/workers/celery_app.py
from celery import Celery
from app.core.config import settings
celery_app = Celery(
"latentgeo" ,
broker = settings. CELERY_BROKER ,
backend = settings. CELERY_BACKEND ,
include = [ "app.workers.tasks" ]
)
celery_app.conf.update(
task_serializer = "json" ,
accept_content = [ "json" ],
result_serializer = "json" ,
timezone = "UTC" ,
enable_utc = True ,
task_track_started = True ,
task_time_limit = 4000 , # 66 minutes hard limit
worker_max_tasks_per_child = 50 , # Restart worker after 50 tasks
)
Environment Variables
# Redis URLs
REDIS_URL = redis://redis:6379/0
CELERY_BROKER = redis://redis:6379/0
CELERY_BACKEND = redis://redis:6379/1
# Worker concurrency
CELERYD_CONCURRENCY = 1
Use separate Redis databases for broker (0) and backend (1) to avoid conflicts.
Task Definitions
Main Audit Task
File : backend/app/workers/tasks.py
from app.workers.celery_app import celery_app
from app.services.audit_service import AuditService
from app.services.pipeline_service import run_initial_audit
@celery_app.task (
name = "run_audit_task" ,
bind = True ,
autoretry_for = ( Exception ,),
retry_kwargs = { "max_retries" : 5 , "countdown" : 60 },
soft_time_limit = 3600 , # 60 minutes soft limit
time_limit = 4000 , # 66 minutes hard limit
)
def run_audit_task ( self , audit_id : int ):
"""
Main audit pipeline task.
Runs target audit, competitor analysis, and generates fix plan.
"""
logger.info( f "Starting audit task for audit_id= { audit_id } " )
try :
with get_db_session() as db:
# Mark as RUNNING
AuditService.update_audit_progress(
db = db,
audit_id = audit_id,
progress = 5 ,
status = AuditStatus. RUNNING
)
audit = AuditService.get_audit(db, audit_id)
audit_url = str (audit.url)
# Run pipeline (outside DB transaction)
result = asyncio.run(
run_initial_audit(
url = audit_url,
audit_id = audit_id,
llm_function = get_llm_function(),
progress_callback = update_progress,
generate_report = False , # Fast mode
enable_llm_external_intel = True
)
)
# Save results
with get_db_session() as db:
asyncio.run(
AuditService.set_audit_results(
db = db,
audit_id = audit_id,
target_audit = result.get( "target_audit" ),
external_intelligence = result.get( "external_intelligence" ),
competitor_audits = result.get( "competitor_audits" ),
fix_plan = result.get( "fix_plan" ),
)
)
AuditService.update_audit_progress(
db = db,
audit_id = audit_id,
progress = 100 ,
status = AuditStatus. COMPLETED
)
logger.info( f "Audit { audit_id } completed successfully" )
except Exception as e:
logger.error( f "Error in audit task: { e } " , exc_info = True )
with get_db_session() as db:
AuditService.update_audit_progress(
db = db,
audit_id = audit_id,
progress = 0 ,
status = AuditStatus. FAILED ,
error_message = str (e)
)
raise
Key Features:
Automatic retries on failure (5 attempts)
Soft/hard time limits
Progress tracking with callbacks
Database session management
Error handling and logging
PDF Generation Task
@celery_app.task ( name = "generate_pdf_task" )
def generate_pdf_task ( audit_id : int , report_markdown : str ):
"""
Generate PDF report from markdown content.
"""
logger.info( f "Starting PDF generation for audit_id= { audit_id } " )
with get_db_session() as db:
audit = AuditService.get_audit(db, audit_id)
try :
pdf_file_path = PDFService.create_from_audit(
audit = audit,
markdown_content = report_markdown
)
ReportService.create_report(
db = db,
audit_id = audit_id,
report_type = "PDF" ,
file_path = pdf_file_path
)
logger.info( f "PDF generated: { pdf_file_path } " )
except Exception as e:
logger.error( f "PDF generation failed: { e } " , exc_info = True )
raise
Full Report Generation Task
@celery_app.task ( name = "generate_full_report_task" )
def generate_full_report_task ( audit_id : int ):
"""
Orchestrator task for complete PDF report:
1. Run PageSpeed if missing
2. Run GEO Tools if missing
3. Regenerate report with LLM
4. Generate PDF
"""
logger.info( f "Starting full report generation for audit_id= { audit_id } " )
# Step 1: PageSpeed
with get_db_session() as db:
audit = AuditService.get_audit(db, audit_id)
if not audit.pagespeed_data:
logger.info( "Running PageSpeed analysis..." )
pagespeed_data = asyncio.run(
PageSpeedService.analyze_both_strategies(
url = str (audit.url),
api_key = settings. GOOGLE_PAGESPEED_API_KEY
)
)
audit.pagespeed_data = pagespeed_data
db.commit()
# Step 2: GEO Tools
with get_db_session() as db:
audit = AuditService.get_audit(db, audit_id)
if not has_geo_data(audit):
logger.info( "Running GEO tools..." )
asyncio.run(run_geo_tools(db, audit_id))
# Step 3: Regenerate Report
with get_db_session() as db:
audit = AuditService.get_audit(db, audit_id)
complete_context = AuditService.get_complete_audit_context(db, audit_id)
new_report_markdown, new_fix_plan = asyncio.run(
PipelineService.generate_report(
target_audit = audit.target_audit,
external_intelligence = audit.external_intelligence,
pagespeed_data = audit.pagespeed_data,
keywords_data = complete_context[ 'keywords' ],
backlinks_data = complete_context[ 'backlinks' ],
rank_tracking_data = complete_context[ 'rank_tracking' ],
llm_visibility_data = complete_context[ 'llm_visibility' ],
llm_function = get_llm_function()
)
)
audit.report_markdown = new_report_markdown
audit.fix_plan = new_fix_plan
db.commit()
# Step 4: Generate PDF
with get_db_session() as db:
audit = AuditService.get_audit(db, audit_id)
pdf_file_path = PDFService.create_from_audit(
audit = audit,
markdown_content = audit.report_markdown
)
ReportService.create_report(
db = db,
audit_id = audit_id,
report_type = "PDF" ,
file_path = pdf_file_path
)
logger.info( f "Full report generated for audit { audit_id } " )
GEO Analysis Task
@celery_app.task (
name = "run_geo_analysis_task" ,
bind = True ,
autoretry_for = ( Exception ,),
retry_kwargs = { "max_retries" : 3 , "countdown" : 60 },
soft_time_limit = 900 , # 15 minutes
time_limit = 1000 ,
)
def run_geo_analysis_task ( self , audit_id : int ):
"""
Run GEO tools: Keywords, Rankings, Backlinks, LLM Visibility.
"""
logger.info( f "Starting GEO analysis for audit_id= { audit_id } " )
with get_db_session() as db:
audit = AuditService.get_audit(db, audit_id)
domain = urlparse( str (audit.url)).netloc.replace( "www." , "" )
brand_name = domain.split( "." )[ 0 ]
keywords = [brand_name]
if audit.external_intelligence:
category = audit.external_intelligence.get( "category" )
if category:
keywords.append(category)
async def run_tools ():
rank_service = RankTrackerService(db)
backlink_service = BacklinkService(db)
visibility_service = LLMVisibilityService(db)
await rank_service.track_rankings(audit_id, domain, keywords)
await backlink_service.analyze_backlinks(audit_id, domain)
await visibility_service.check_visibility(audit_id, brand_name, keywords)
asyncio.run(run_tools())
logger.info( f "GEO analysis completed for audit { audit_id } " )
Running Workers
Local Development
The worker is automatically started by Docker Compose:
worker :
build :
context : .
dockerfile : Dockerfile.backend.dev
command : celery -A app.workers.tasks worker --loglevel=info --concurrency=1
environment :
DATABASE_URL : ${DATABASE_URL}
REDIS_URL : redis://redis:6379/0
CELERY_BROKER : redis://redis:6379/0
CELERY_BACKEND : redis://redis:6379/1
Manual Worker Start
# Start worker
celery -A app.workers.tasks worker --loglevel=info --concurrency=2
# Start with autoreload (development)
watchmedo auto-restart --directory=./backend/app --pattern= * .py --recursive -- \
celery -A app.workers.tasks worker --loglevel=info
Production Worker
# Start with multiple workers and prefork pool
celery -A app.workers.tasks worker \
--loglevel=info \
--concurrency=4 \
--max-tasks-per-child=50 \
--time-limit=4000
Dispatching Tasks
From FastAPI routes:
Async Dispatch
Get Task Status
from app.workers.tasks import run_audit_task
@router.post ( "/audits/" )
async def create_audit ( audit_create : AuditCreate, db : Session = Depends(get_db)):
# Create audit in DB
audit = AuditService.create_audit(db, audit_create)
# Dispatch Celery task (async)
run_audit_task.delay(audit.id)
return audit
Monitoring Workers
View Logs
# Docker Compose logs
docker compose -f docker-compose.dev.yml logs -f worker
# Filter for specific audit
docker compose -f docker-compose.dev.yml logs -f worker | grep "audit_id=123"
Celery Flower (Web UI)
Optional monitoring dashboard:
# Install
pip install flower
# Start Flower
celery -A app.workers.tasks flower --port=5555
# Visit http://localhost:5555
Redis CLI
Inspect queue:
# Connect to Redis
docker compose exec redis redis-cli
# Check queue length
LLEN celery
# View pending tasks
LRANGE celery 0 -1
Error Handling
Automatic Retries
@celery_app.task (
bind = True ,
autoretry_for = ( ConnectionError , TimeoutError ),
retry_kwargs = { "max_retries" : 5 , "countdown" : 60 },
)
def flaky_task ( self , audit_id : int ):
try :
# Task logic
pass
except SomeException as exc:
# Manual retry with exponential backoff
raise self .retry( exc = exc, countdown = 2 ** self .request.retries)
Task Failure Callbacks
@celery_app.task ( name = "run_audit_task" )
def run_audit_task ( audit_id : int ):
try :
# Task logic
pass
except Exception as e:
# Mark audit as failed
with get_db_session() as db:
AuditService.update_audit_progress(
db = db,
audit_id = audit_id,
status = AuditStatus. FAILED ,
error_message = str (e)
)
raise
Best Practices
Use database sessions carefully
Don’t keep DB sessions open for the entire task. Open/close sessions as needed. # Good
def task ():
with get_db_session() as db:
data = fetch_data(db)
result = heavy_computation(data) # Outside session
with get_db_session() as db:
save_result(db, result)
Set appropriate time limits
Use soft_time_limit (graceful) and time_limit (hard kill). @celery_app.task (
soft_time_limit = 600 , # 10 minutes warning
time_limit = 660 , # 11 minutes hard kill
)
Route heavy tasks to dedicated workers. celery_app.conf.task_routes = {
'generate_pdf_task' : { 'queue' : 'heavy' },
'run_audit_task' : { 'queue' : 'default' },
}
Restart workers periodically to prevent memory leaks. celery_app.conf.worker_max_tasks_per_child = 50
Troubleshooting
Task Not Starting
Check worker is running:
Check Redis connection:
docker compose logs redis
Verify task is in queue:
docker compose exec redis redis-cli LLEN celery
Task Stuck
Check worker logs:
docker compose logs -f worker
Inspect task state:
from celery.result import AsyncResult
result = AsyncResult(task_id)
print (result.state, result.info)
Restart worker:
docker compose restart worker
High Memory Usage
Increase restart frequency:
celery_app.conf.worker_max_tasks_per_child = 25 # Lower value
Next Steps
Backend Services Learn about services called by workers
Local Setup Configure Redis and workers locally
Contributing Contribute to task definitions