Skip to main content
The backend is a Python FastAPI application (backend/main.py) that runs on port 8000. It manages several long-lived background services and exposes a REST API consumed by the Next.js frontend.

App structure

backend/
├── main.py                     # FastAPI app, middleware, API routes
├── ais_proxy.js                # Node.js WebSocket proxy for aisstream.io
├── carrier_cache.json          # Persisted carrier OSINT positions
├── cctv.db                     # SQLite CCTV camera database
├── config/
│   └── news_feeds.json         # User-customizable RSS feed list
└── services/
    ├── data_fetcher.py         # Scheduler + fetcher orchestration
    ├── ais_stream.py           # AIS WebSocket client
    ├── carrier_tracker.py      # OSINT carrier position tracker
    ├── cctv_pipeline.py        # Multi-source CCTV ingestion
    ├── geopolitics.py          # GDELT + Ukraine frontline
    ├── region_dossier.py       # Right-click country/city intelligence
    ├── sentinel_search.py      # Sentinel-2 STAC imagery search
    ├── radio_intercept.py      # Scanner radio feed integration
    ├── kiwisdr_fetcher.py      # KiwiSDR receiver scraper
    ├── schemas.py              # Pydantic response models
    ├── network_utils.py        # HTTP client with curl fallback
    ├── api_settings.py         # API key management
    └── news_feed_config.py     # RSS feed config manager

Lifespan and startup sequence

FastAPI’s @asynccontextmanager lifespan handler orchestrates startup and shutdown:
@asynccontextmanager
async def lifespan(app: FastAPI):
    from services.env_check import validate_env
    validate_env(strict=True)

    # 1. Load AIS disk cache, open WebSocket
    start_ais_stream()

    # 2. Start OSINT carrier position tracker
    start_carrier_tracker()

    # 3. Register APScheduler jobs (fast=60s, slow=5min)
    start_scheduler()

    # 4. Full preload in background — server accepts requests immediately
    def _background_preload():
        update_all_data()

    threading.Thread(target=_background_preload, daemon=True).start()

    yield  # server runs here

    stop_ais_stream()
    stop_scheduler()
    stop_carrier_tracker()
The server binds to port 8000 immediately after step 3. The preload (step 4) populates data in the background while the frontend’s adaptive 3-second startup polling picks up results as each fetcher finishes.

Data fetcher and scheduler

services/data_fetcher.py is the orchestration hub. It defines two refresh tiers and registers them with APScheduler:
def update_fast_data():
    """Fast-tier: moving entities — every 60s."""
    fast_funcs = [
        fetch_flights,
        fetch_military_flights,
        fetch_ships,
        fetch_satellites,
    ]
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(fast_funcs)) as executor:
        futures = [executor.submit(func) for func in fast_funcs]
        concurrent.futures.wait(futures)

def update_slow_data():
    """Slow-tier: contextual data — every 5 minutes."""
    slow_funcs = [
        fetch_news, fetch_earthquakes, fetch_firms_fires,
        fetch_defense_stocks, fetch_oil_prices, fetch_weather,
        fetch_space_weather, fetch_internet_outages,
        fetch_cctv, fetch_kiwisdr, fetch_frontlines,
        fetch_gdelt, fetch_datacenters, fetch_military_bases, fetch_power_plants,
    ]
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(slow_funcs)) as executor:
        ...
All fetchers within a tier execute in parallel. The scheduler itself runs as an APScheduler BackgroundScheduler with daemon=True.
JobIntervalmax_instances
fast_tier60s1
slow_tier5 min1
gdelt15 min1
liveuamap15 min1
CCTV ingestors (×4)10 min1

Service modules

ais_stream.py

Manages a Node.js subprocess (ais_proxy.js) that holds the WebSocket connection to aisstream.io. Vessel positions stream in via stdout. Supports disk caching (ais_cache.json) and local AIS-catcher ingestion via POST /api/ais/feed. Vessels are pruned after 15 minutes of inactivity.

carrier_tracker.py

Tracks all 11 active US Navy aircraft carriers using GDELT news scraping and 50+ geographic region-to-coordinate mappings. Positions are disk-cached in carrier_cache.json and updated at 00:00 and 12:00 UTC.

cctv_pipeline.py

Ingests live traffic cameras from four sources: Transport for London JamCams, Austin TX TxDOT, NYC DOT, and Singapore LTA. Cameras are stored in a SQLite database (cctv.db) and refreshed every 10 minutes.

geopolitics.py

Fetches GDELT conflict events (last 8 hours) and the Ukraine frontline GeoJSON from DeepState Map. GDELT is updated every 15 minutes; the frontline runs on the slow 5-minute tier.

region_dossier.py

Handles right-click map requests. Reverse-geocodes a lat/lng, then queries RestCountries for a country profile, Wikidata SPARQL for the head of state, and Wikipedia for a local summary and thumbnail. Results are cached for 24 hours.

sentinel_search.py

Queries the Microsoft Planetary Computer STAC API for the most recent Sentinel-2 L2A scene at a given point. Returns capture date, cloud cover, and a thumbnail URL for the right-click intel card.

radio_intercept.py

Integrates with Broadcastify (top scanner feeds) and OpenMHz (trunked radio call archives). Provides nearest-system lookups by lat/lng for the Radio Intercept Panel.

kiwisdr_fetcher.py

Scrapes the KiwiSDR public receiver list (~500+ nodes worldwide). Each node includes name, location, antenna type, frequency bands, and active user count.

Admin authentication

Sensitive endpoints (API key management, news feed config, system update) are protected by an X-Admin-Key header dependency:
_ADMIN_KEY = os.environ.get("ADMIN_KEY", "")

def require_admin(request: Request):
    if not _ADMIN_KEY:
        return  # No key set — allow all (local dev)
    if request.headers.get("X-Admin-Key") != _ADMIN_KEY:
        raise HTTPException(status_code=403, detail="Forbidden — invalid or missing admin key")
Set ADMIN_KEY in .env or via Docker Swarm secrets for production deployments. If the variable is unset, a warning is logged at startup and endpoints remain open — safe for local development, not for public exposure.

Rate limiting

All endpoints use slowapi with get_remote_address as the key function:
limiter = Limiter(key_func=get_remote_address)

@app.get("/api/live-data/fast")
@limiter.limit("120/minute")
async def live_data_fast(request: Request, ...):
    ...
EndpointLimit
/api/live-data/fast120/min
/api/live-data/slow60/min
/api/radio/*30–60/min
/api/region-dossier30/min
/api/sentinel2/search30/min
/api/refresh2/min
/api/system/update1/min

ETag caching

Both /api/live-data/fast and /api/live-data/slow use a shared _etag_response helper that serializes the payload once, computes an MD5 ETag, and returns 304 Not Modified if the client’s If-None-Match header matches:
def _etag_response(request: Request, payload: dict, prefix: str = ""):
    content = json.dumps(payload)
    etag = hashlib.md5(f"{prefix}{content}".encode()).hexdigest()[:16]
    if request.headers.get("if-none-match") == etag:
        return Response(status_code=304,
                        headers={"ETag": etag, "Cache-Control": "no-cache"})
    return Response(content=content, media_type="application/json",
                    headers={"ETag": etag, "Cache-Control": "no-cache"})
The ETag prefix includes the bbox tag (s,w,n,e) so viewport-filtered responses are cached separately from full-dataset responses.

Gzip compression

GZipMiddleware is applied at the middleware level with a 1 KB threshold:
from fastapi.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
This compresses the large JSON payloads by approximately 92%, reducing a typical fast-endpoint response from ~11.6 MB to ~915 KB. See Performance for the full breakdown.

Docker Swarm secrets

Before any service modules are imported, main.py checks for *_FILE environment variables and reads the corresponding file contents into the matching environment variable:
_SECRET_VARS = [
    "AIS_API_KEY", "OPENSKY_CLIENT_ID", "OPENSKY_CLIENT_SECRET",
    "LTA_ACCOUNT_KEY", "CORS_ORIGINS", "ADMIN_KEY",
]

for _var in _SECRET_VARS:
    _file_var = f"{_var}_FILE"
    _file_path = os.environ.get(_file_var)
    if _file_path:
        with open(_file_path, "r") as _f:
            os.environ[_var] = _f.read().strip()
This runs before service imports because modules read os.environ at import time. Mount your secrets at /run/secrets/<name> and set AIS_API_KEY_FILE=/run/secrets/AIS_API_KEY to use them.

Pydantic schemas

Response models live in services/schemas.py:
class HealthResponse(BaseModel):
    status: str
    last_updated: Optional[str]
    sources: Dict[str, int]   # feature counts per layer
    freshness: Dict[str, str] # ISO timestamps per source
    uptime_seconds: int

class RefreshResponse(BaseModel):
    status: str

class RouteResponse(BaseModel):
    orig_loc: Optional[list]
    dest_loc: Optional[list]
    origin_name: Optional[str]
    dest_name: Optional[str]

Build docs developers (and LLMs) love