Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/AngelAmoSanchez/TFG-RaspberryPi-BLE/llms.txt

Use this file to discover all available pages before exploring further.

Each Raspberry Pi agent can deliver detection data to the cloud backend in one of two ways: publishing to an MQTT broker that the backend subscribes to, or posting directly to a REST endpoint over HTTP. The mode is selected on the Pi side by setting COMMUNICATION_MODE in the agent’s environment. Both modes send the same payload structure; the difference is the transport and reliability model.

Choosing a mode

MQTT

Best for unreliable or intermittent network connections. The Pi buffers up to 100 messages locally when offline and flushes them automatically on reconnect. The broker decouples the Pi from the backend — the backend can restart without the Pi losing data.

HTTP

Simpler to set up on a reliable LAN where the Pi can always reach the backend directly. No broker infrastructure needed. Each scan cycle results in a single POST request. If the request fails the batch is not retried automatically.

MQTT mode

How the subscriber works

When MQTT_ENABLED=true, the backend starts an MQTTSubscriber task during application startup (in the FastAPI lifespan handler). It connects to the configured broker using paho-mqtt, subscribes to tfg/detections/raw with QoS 1, and listens for detection payloads from any registered Pi agent.
async def start_mqtt_subscriber():
    """Ejecuta MQTT subscriber si está habilitado"""
    if not settings.mqtt_enabled:
        logger.info("MQTT subscriber deshabilitado")
        return None

    if not settings.mqtt_broker:
        logger.warning("MQTT broker no configurado")
        return None

    subscriber = MQTTSubscriber(
        broker=settings.mqtt_broker,
        port=settings.mqtt_port,
        topic=settings.mqtt_topic,
        username=settings.mqtt_username,
        password=settings.mqtt_password,
    )

    asyncio.create_task(subscriber.run())
    return subscriber
When a message arrives, on_message parses the JSON payload and schedules process_message on the asyncio event loop using run_coroutine_threadsafe. A semaphore limits concurrent processing to three messages at a time:
self.semaphore = asyncio.Semaphore(3)  # máximo 3 mensajes a la vez
The processor upserts the device record, saves all detections to the database, and then broadcasts a detection_event over WebSocket to any connected dashboard clients.

Topic format

All Pi agents publish to the same topic by default:
tfg/detections/raw
The topic is configurable via MQTT_TOPIC on both the Pi agent and the backend. If you run multiple deployments, use different topics to keep their data streams separate.

Message format

{
  "device_id": "rpi-entrance-01",
  "name": "Entrance Pi",
  "location": "Main entrance",
  "timestamp": "2024-01-15T10:30:00+01:00",
  "detections": [
    {
      "device_hash": "a3f2b1c4d5e6...",
      "rssi": -58,
      "zone": "near",
      "timestamp": "2024-01-15T10:29:55+01:00"
    }
  ]
}

Enabling MQTT on the backend

Set these variables in the backend .env:
MQTT_ENABLED=true
MQTT_BROKER=your-broker.eu-central-1.emqxsl.com
MQTT_PORT=8883
MQTT_TOPIC=tfg/detections/raw
MQTT_USERNAME=your-username
MQTT_PASSWORD=your-password
The backend subscriber automatically enables TLS when connecting. EMQX Cloud clusters require port 8883. For a local broker without TLS, use port 1883.

HTTP mode

How the Pi posts to the backend

In HTTP mode the Pi agent sends a POST request to /api/v1/detections/bulk at the end of each scan cycle. The HTTPClient builds the payload, adds the X-API-Key header if configured, and posts it synchronously:
self.detections_endpoint = f"{self.base_url}/api/v1/detections/bulk"

payload = {
    "device_id": self.device_id,
    "detections": [
        {
            "device_hash": det.device_hash,
            "rssi": det.rssi,
            "zone": det.zone.value,
            "timestamp": det.timestamp.isoformat(),
        }
        for det in detections
    ],
}

headers = {"Content-Type": "application/json"}
if self.api_key:
    headers["X-API-Key"] = self.api_key

response = requests.post(
    self.detections_endpoint,
    json=payload,
    headers=headers,
    timeout=self.timeout,
)
The backend returns 200 or 201 on success. Any other status code is logged as an error on the Pi. Before each scan cycle the Pi calls GET /health to verify connectivity:
health_url = f"{self.base_url}/health"
response = requests.get(health_url, timeout=self.timeout)

Offline buffering (MQTT only)

The MQTT client on the Pi maintains an in-memory circular buffer for detection batches that could not be delivered because the broker was unreachable:
max_buffer_size: int = 100  # Tamaño máximo del buffer offline
When the buffer is full, the oldest entry is dropped to make room for the newest one:
if len(self._buffer) >= self.max_buffer_size:
    logger.warning(
        f"Buffer lleno ({self.max_buffer_size}), descartando el mensaje más antiguo"
    )
    self._buffer.pop(0)
When the Pi reconnects to the broker, _flush_buffer publishes all queued messages before processing new ones:
async def _flush_buffer(self):
    """Envía todos los mensajes del buffer al reconectar con el backend"""
    if not self._buffer:
        return

    logger.info(f"Publicando mensajes del buffer ({len(self._buffer)})...")
    messages_to_send = self._buffer.copy()
    self._buffer.clear()

    for message in messages_to_send:
        if not self._publish_message(message):
            logger.warning("ERROR - Error al publicar mensaje del buffer, reintentando más tarde")
            break
The buffer is held in RAM and is lost if the Pi agent process restarts. For deployments where the Pi may reboot frequently, consider enabling persistent storage or increasing MQTT_BUFFER_SIZE.
HTTP mode has no buffer. HTTPClient.get_buffer_size() always returns 0. If a POST fails, the batch is discarded.

Switching modes

Set COMMUNICATION_MODE in the Pi agent’s .env file. The agent validates the value on startup and raises a ValueError if it is not mqtt or http.
.env (Pi agent)
COMMUNICATION_MODE=mqtt
DEVICE_ID=rpi-entrance-01
DEVICE_NAME=Entrance Pi
DEVICE_LOCATION=Main entrance

MQTT_BROKER=your-broker.eu-central-1.emqxsl.com
MQTT_PORT=8883
MQTT_TOPIC=tfg/detections/raw
MQTT_USERNAME=your-username
MQTT_PASSWORD=your-password
MQTT_BUFFER_SIZE=100
.env (backend)
MQTT_ENABLED=true
MQTT_BROKER=your-broker.eu-central-1.emqxsl.com
MQTT_PORT=8883
MQTT_TOPIC=tfg/detections/raw
MQTT_USERNAME=your-username
MQTT_PASSWORD=your-password
In HTTP mode the Pi checks GET /health on the backend before each cycle. If HTTP_BASE_URL is wrong or the backend is down, the Pi logs a connectivity error and skips that scan batch.

Build docs developers (and LLMs) love