Skip to main content
The Kinematrix MQTT Manager provides a full-featured MQTT client wrapper around PubSubClient with automatic WiFi and broker reconnection, subscription persistence, and convenience methods for common patterns.

Overview

MQTTManager simplifies MQTT communication for IoT projects with:

Auto-Reconnection

Automatic WiFi and MQTT broker reconnection with configurable intervals

Subscription Persistence

Automatically resubscribe to topics after reconnection

JSON Support

Built-in JSON serialization using ArduinoJson

Status Messages

Last-Will-and-Testament (LWT) for device status tracking

Dependencies

Add to your platformio.ini:
lib_deps = 
  knolleary/PubSubClient@^2.8
  bblanchon/ArduinoJson@^6.21.3

Enable Module

#define ENABLE_MODULE_MQTT_MANAGER
#include "Kinematrix.h"

Basic Usage

Simple Publish/Subscribe

#include <WiFi.h>
#define ENABLE_MODULE_MQTT_MANAGER
#include "Kinematrix.h"

MQTTManager mqtt;

void onMessage(char* topic, byte* payload, unsigned int length) {
  // Convert payload to string
  char message[length + 1];
  memcpy(message, payload, length);
  message[length] = '\0';
  
  Serial.print("Message on ");
  Serial.print(topic);
  Serial.print(": ");
  Serial.println(message);
  
  // Handle commands
  if (strcmp(topic, "device/command") == 0) {
    if (strcmp(message, "ON") == 0) {
      digitalWrite(LED_PIN, HIGH);
    } else if (strcmp(message, "OFF") == 0) {
      digitalWrite(LED_PIN, LOW);
    }
  }
}

void setup() {
  Serial.begin(115200);
  pinMode(LED_PIN, OUTPUT);
  
  // Connect to WiFi
  mqtt.beginWiFi("YourSSID", "YourPassword");
  
  // Configure MQTT
  mqtt.beginMQTT("broker.hivemq.com", 1883);
  mqtt.setClientId("ESP32_Device_001");
  mqtt.setCallback(onMessage);
  
  // Connect to broker
  if (mqtt.connect()) {
    Serial.println("MQTT Connected!");
    
    // Subscribe to topics
    mqtt.subscribe("device/command");
    mqtt.subscribe("device/config");
  }
}

void loop() {
  mqtt.loop();  // Process MQTT messages
  
  // Publish sensor data every 5 seconds
  static unsigned long lastPublish = 0;
  if (millis() - lastPublish >= 5000) {
    lastPublish = millis();
    
    float temperature = readTemperature();
    String payload = String(temperature, 2);
    mqtt.publish("device/temperature", payload.c_str());
  }
}

WiFi Management

Basic WiFi Connection

// Connect with 20-second timeout
if (mqtt.beginWiFi("SSID", "password", 20000)) {
  Serial.println("WiFi connected");
  Serial.println(mqtt.getLocalIP());
}

WiFi Status Checking

void loop() {
  if (!mqtt.isWiFiConnected()) {
    Serial.println("WiFi disconnected!");
    // loop() will automatically try to reconnect
  }
  
  mqtt.loop();
}

External WiFi Management

If you manage WiFi separately:
// Don't use mqtt.beginWiFi()
// Just configure MQTT
mqtt.beginMQTT("broker.hivemq.com", 1883);

// MQTTManager will still check WiFi status

MQTT Configuration

Broker Setup

// Public broker (no authentication)
mqtt.beginMQTT("broker.hivemq.com", 1883);

// Private broker with authentication
mqtt.beginMQTT(
  "mqtt.example.com",
  1883,
  "username",
  "password"
);

Client ID

// Set custom client ID
mqtt.setClientId("MyDevice_001");

// Generate unique ID from MAC address
String clientId = "ESP32_";
#ifdef ESP32
  clientId += String((uint32_t)(ESP.getEfuseMac() & 0xffffff), HEX);
#elif defined(ESP8266)
  clientId += String(ESP.getChipId(), HEX);
#endif
mqtt.setClientId(clientId.c_str());

Connection Options

// Set keep-alive interval (default 15s)
mqtt.setKeepAlive(30);  // 30 seconds

// Set socket timeout (default 15s)
mqtt.setSocketTimeout(20);  // 20 seconds

// Increase buffer size for larger messages (default 256 bytes)
mqtt.setBufferSize(1024);  // 1KB buffer

// Enable/disable auto-reconnection
mqtt.enableAutoReconnect(true);  // Default: true

Publishing

Basic Publishing

// Publish string
mqtt.publish("sensor/temperature", "25.5");

// Publish with retained flag
mqtt.publish("device/status", "online", true);

// Publish binary data
uint8_t data[] = {0x01, 0x02, 0x03};
mqtt.publish("device/data", data, sizeof(data));

Topic Helpers

// Publish to hierarchical topic
mqtt.publishToTopic(
  "devices/esp32",  // Base topic
  "temperature",    // Key
  "25.5"            // Value
);
// Publishes to: devices/esp32/temperature

JSON Publishing

#include <ArduinoJson.h>

StaticJsonDocument<200> doc;
doc["temperature"] = 25.5;
doc["humidity"] = 60;
doc["timestamp"] = millis();

// Publish JSON document
mqtt.publishJson("sensor/data", doc);
// Publishes: {"temperature":25.5,"humidity":60,"timestamp":12345}

// With retained flag
mqtt.publishJson("sensor/data", doc, true);

Streamed Publishing (Large Messages)

// Begin publish stream
if (mqtt.beginPublish("device/log", 1024, false)) {
  // Write data in chunks
  mqtt.write((uint8_t*)"Long message part 1...", 23);
  mqtt.write((uint8_t*)"Long message part 2...", 23);
  
  // Complete publish
  if (mqtt.endPublish()) {
    Serial.println("Streamed publish successful");
  }
}

Clear Retained Messages

// Remove retained message from topic
mqtt.clearRetained("device/status");

Subscribing

Basic Subscription

// Subscribe with QoS 0 (default)
mqtt.subscribe("device/command");

// Subscribe with QoS 1 (at least once delivery)
mqtt.subscribe("device/config", 1);

Unsubscribe

mqtt.unsubscribe("device/command");

Wildcard Subscriptions

// Subscribe to all sensors
mqtt.subscribe("sensors/+");  // Single level

// Subscribe to entire tree
mqtt.subscribe("devices/#");  // Multi level

Message Callback

void messageCallback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Topic: ");
  Serial.println(topic);
  
  // Parse JSON payload
  StaticJsonDocument<200> doc;
  DeserializationError error = deserializeJson(doc, payload, length);
  
  if (!error) {
    float temp = doc["temperature"];
    int humidity = doc["humidity"];
    Serial.println("Temp: " + String(temp) + "C");
    Serial.println("Humidity: " + String(humidity) + "%");
  }
}

void setup() {
  mqtt.setCallback(messageCallback);
}

Last-Will-and-Testament (LWT)

Basic LWT

// Set will message before connecting
mqtt.setWill(
  "device/status",  // Topic
  "offline",        // Message
  true,             // Retained
  1                 // QoS
);

mqtt.connect();

Connect with Status

// Set LWT and publish online status in one call
mqtt.connectWithStatus(
  "device/status",  // Status topic
  "online",         // Online message
  "offline"         // Offline message (LWT)
);

// When device disconnects unexpectedly,
// broker publishes "offline" automatically

Automatic Status Messages

// Enable status message management
mqtt.enableStatusMessages(
  "device/esp32/status",
  "online",
  "offline"
);

// MQTTManager will:
// 1. Set LWT to "offline"
// 2. Publish "online" on connection
// 3. Monitor connection and update status

void loop() {
  mqtt.loop();  // Automatically manages status
}

// Disable status messages
mqtt.disableStatusMessages();

Connection Management

Manual Connection

if (mqtt.connect()) {
  Serial.println("Connected to MQTT broker");
} else {
  Serial.print("Failed: ");
  Serial.println(mqtt.getStateString());
}

Auto-Reconnection

void loop() {
  // Automatically reconnects every 5 seconds if disconnected
  mqtt.loop();
}

// Change reconnection interval
void setup() {
  // Reconnect attempt every 10 seconds
  mqtt.enableAutoReconnect(true);
}

void loop() {
  // Pass custom interval to reconnect()
  if (!mqtt.isConnected()) {
    mqtt.reconnect(10000);  // 10 second interval
  }
  mqtt.loop();
}

Manual Reconnection

if (!mqtt.isConnected()) {
  Serial.println("Attempting reconnection...");
  if (mqtt.reconnect(5000)) {  // Try every 5 seconds
    Serial.println("Reconnected!");
  }
}

Disconnect

// Gracefully disconnect from broker
mqtt.disconnect();

Connection Status

if (mqtt.isConnected()) {
  Serial.println("MQTT is connected");
}

// Get state code
int state = mqtt.state();
// MQTT_CONNECTED = 0
// MQTT_CONNECTION_TIMEOUT = -4
// MQTT_CONNECTION_LOST = -3
// MQTT_CONNECT_FAILED = -2
// MQTT_DISCONNECTED = -1
// etc.

// Get human-readable state
Serial.println(mqtt.getStateString());
// Prints: "Connected", "Disconnected", "Connection timeout", etc.

Advanced Features

Subscription Persistence

MQTTManager automatically resubscribes to topics after reconnection:
void setup() {
  mqtt.beginMQTT("broker.hivemq.com", 1883);
  mqtt.connect();
  
  // Subscribe to topics
  mqtt.subscribe("device/command");
  mqtt.subscribe("sensor/config");
  mqtt.subscribe("alerts/#");
}

void loop() {
  mqtt.loop();
  
  // If connection is lost and restored,
  // all 3 subscriptions are automatically restored
}

Access Underlying Client

For advanced PubSubClient features:
PubSubClient& client = mqtt.getClient();

// Use PubSubClient methods directly
client.setServer("broker.hivemq.com", 1883);
client.publish("topic", "message");

Buffer Size Management

// Check current buffer size
Serial.println(mqtt.getBufferSize());

// Increase for large messages
if (mqtt.setBufferSize(2048)) {
  Serial.println("Buffer increased to 2KB");
} else {
  Serial.println("Failed to allocate buffer");
}

Practical Examples

Sensor Data Publisher

#include <WiFi.h>
#include <ArduinoJson.h>
#define ENABLE_MODULE_MQTT_MANAGER
#include "Kinematrix.h"

MQTTManager mqtt;
const char* SENSOR_TOPIC = "home/sensors/living_room";

void setup() {
  Serial.begin(115200);
  
  mqtt.beginWiFi("SSID", "password");
  mqtt.beginMQTT("broker.hivemq.com", 1883);
  mqtt.setClientId("LivingRoomSensor");
  
  mqtt.connectWithStatus(
    "home/sensors/living_room/status",
    "online",
    "offline"
  );
}

void loop() {
  mqtt.loop();
  
  static unsigned long lastPublish = 0;
  if (millis() - lastPublish >= 30000) {  // Every 30 seconds
    lastPublish = millis();
    
    // Read sensors
    float temp = readTemperature();
    float humidity = readHumidity();
    
    // Create JSON
    StaticJsonDocument<200> doc;
    doc["temperature"] = temp;
    doc["humidity"] = humidity;
    doc["timestamp"] = millis() / 1000;
    
    // Publish
    mqtt.publishJson(SENSOR_TOPIC, doc);
    Serial.println("Data published");
  }
}

Command Receiver

MQTTManager mqtt;
bool ledState = false;

void onCommand(char* topic, byte* payload, unsigned int length) {
  StaticJsonDocument<200> doc;
  deserializeJson(doc, payload, length);
  
  const char* cmd = doc["command"];
  
  if (strcmp(cmd, "LED_ON") == 0) {
    ledState = true;
    digitalWrite(LED_PIN, HIGH);
    mqtt.publish("device/led/state", "ON", true);
  }
  else if (strcmp(cmd, "LED_OFF") == 0) {
    ledState = false;
    digitalWrite(LED_PIN, LOW);
    mqtt.publish("device/led/state", "OFF", true);
  }
  else if (strcmp(cmd, "STATUS") == 0) {
    // Report status
    StaticJsonDocument<200> status;
    status["led"] = ledState;
    status["uptime"] = millis() / 1000;
    mqtt.publishJson("device/status", status);
  }
}

void setup() {
  pinMode(LED_PIN, OUTPUT);
  
  mqtt.beginWiFi("SSID", "password");
  mqtt.beginMQTT("broker.hivemq.com", 1883);
  mqtt.setCallback(onCommand);
  
  if (mqtt.connect()) {
    mqtt.subscribe("device/command");
  }
}

void loop() {
  mqtt.loop();
}

Multi-Sensor Hub

MQTTManager mqtt;
const char* BASE_TOPIC = "home/sensors";

void setup() {
  Serial.begin(115200);
  
  mqtt.beginWiFi("SSID", "password");
  mqtt.beginMQTT("mqtt.example.com", 1883, "user", "pass");
  mqtt.setClientId("SensorHub_001");
  mqtt.setBufferSize(512);  // Larger buffer for multiple sensors
  
  mqtt.connectWithStatus(
    "home/sensors/hub/status",
    "online",
    "offline"
  );
}

void loop() {
  mqtt.loop();
  
  static unsigned long lastRead = 0;
  if (millis() - lastRead >= 10000) {
    lastRead = millis();
    
    // Publish multiple sensor readings
    mqtt.publishToTopic(BASE_TOPIC, "temperature", String(readTemp()).c_str());
    mqtt.publishToTopic(BASE_TOPIC, "humidity", String(readHumidity()).c_str());
    mqtt.publishToTopic(BASE_TOPIC, "pressure", String(readPressure()).c_str());
    mqtt.publishToTopic(BASE_TOPIC, "light", String(readLight()).c_str());
    
    // Or publish as single JSON
    StaticJsonDocument<300> doc;
    doc["temp"] = readTemp();
    doc["humidity"] = readHumidity();
    doc["pressure"] = readPressure();
    doc["light"] = readLight();
    mqtt.publishJson("home/sensors/all", doc);
  }
}

Best Practices

Keep loop() fast. MQTT requires regular processing. Avoid long delays in loop().
Use QoS wisely:
  • QoS 0: Fire and forget (fast, unreliable)
  • QoS 1: At least once (reliable, possible duplicates)
  • QoS 2: Exactly once (slowest, guaranteed)
Topic naming conventions:
  • Use hierarchical structure: building/floor/room/device/sensor
  • Keep topics short but descriptive
  • Use lowercase
  • Avoid spaces and special characters

Troubleshooting

Connection Fails

if (!mqtt.connect()) {
  int state = mqtt.state();
  Serial.print("Connection failed: ");
  Serial.println(mqtt.getStateString());
  
  switch(state) {
    case -4: // MQTT_CONNECTION_TIMEOUT
      Serial.println("Check broker address and port");
      break;
    case -3: // MQTT_CONNECTION_LOST
      Serial.println("Connection lost, will retry");
      break;
    case -2: // MQTT_CONNECT_FAILED
      Serial.println("Network issue");
      break;
    case 2: // MQTT_CONNECT_BAD_CLIENT_ID
      Serial.println("Change client ID");
      break;
    case 4: // MQTT_CONNECT_BAD_CREDENTIALS
      Serial.println("Check username/password");
      break;
    case 5: // MQTT_CONNECT_UNAUTHORIZED
      Serial.println("Check permissions");
      break;
  }
}

Messages Not Received

// Ensure callback is set BEFORE connecting
mqtt.setCallback(messageCallback);
mqtt.connect();
mqtt.subscribe("topic");

// Ensure loop() is called regularly
void loop() {
  mqtt.loop();  // REQUIRED
}

Buffer Overflow

// Increase buffer size for large messages
mqtt.setBufferSize(1024);  // Default is 256

// Or use streamed publishing
mqtt.beginPublish("topic", messageLength, false);
mqtt.write(data, length);
mqtt.endPublish();

API Reference

beginWiFi
bool(ssid, password, timeout)
Connect to WiFi network. Returns true if successful. Default timeout 20s.
isWiFiConnected
bool()
Check WiFi connection status.
getLocalIP
IPAddress()
Get device IP address.
beginMQTT
bool(server, port, username, password)
Configure MQTT broker. Username and password are optional.
setClientId
void(clientId)
Set MQTT client identifier.
setCallback
void(callback)
Set message callback function.
setWill
void(topic, message, retained, qos)
Configure Last-Will-and-Testament message.
connect
bool()
Connect to MQTT broker. Returns true if successful.
reconnect
bool(interval)
Attempt reconnection with specified interval (default 5000ms).
isConnected
bool()
Check MQTT connection status.
state
int()
Get connection state code.
getStateString
const char*()
Get human-readable state description.
publish
bool(topic, payload, retained)
Publish message. Returns true if successful.
publishJson
bool(topic, doc, retained)
Publish JSON document.
subscribe
bool(topic, qos)
Subscribe to topic with QoS (default 0).
unsubscribe
bool(topic)
Unsubscribe from topic.
loop
void()
Process MQTT messages. Must be called regularly.
setBufferSize
bool(size)
Set message buffer size in bytes. Returns false if allocation fails.
enableAutoReconnect
void(enable)
Enable/disable automatic reconnection.

Next Steps

Cloud Integrations

Connect to Google Sheets, Telegram, and WhatsApp

Firebase Integration

Store MQTT data in Firebase for persistence

Build docs developers (and LLMs) love