The Kinematrix MQTT Manager provides a full-featured MQTT client wrapper around PubSubClient with automatic WiFi and broker reconnection, subscription persistence, and convenience methods for common patterns.
Overview
MQTTManager simplifies MQTT communication for IoT projects with:
Auto-Reconnection Automatic WiFi and MQTT broker reconnection with configurable intervals
Subscription Persistence Automatically resubscribe to topics after reconnection
JSON Support Built-in JSON serialization using ArduinoJson
Status Messages Last-Will-and-Testament (LWT) for device status tracking
Dependencies
Add to your platformio.ini:
lib_deps =
knolleary/PubSubClient@^2.8
bblanchon/ArduinoJson@^6.21.3
Enable Module
#define ENABLE_MODULE_MQTT_MANAGER
#include "Kinematrix.h"
Basic Usage
Simple Publish/Subscribe
#include <WiFi.h>
#define ENABLE_MODULE_MQTT_MANAGER
#include "Kinematrix.h"
MQTTManager mqtt;
void onMessage ( char* topic , byte * payload , unsigned int length ) {
// Convert payload to string
char message [length + 1 ];
memcpy (message, payload, length);
message [length] = ' \0 ' ;
Serial . print ( "Message on " );
Serial . print (topic);
Serial . print ( ": " );
Serial . println (message);
// Handle commands
if ( strcmp (topic, "device/command" ) == 0 ) {
if ( strcmp (message, "ON" ) == 0 ) {
digitalWrite (LED_PIN, HIGH);
} else if ( strcmp (message, "OFF" ) == 0 ) {
digitalWrite (LED_PIN, LOW);
}
}
}
void setup () {
Serial . begin ( 115200 );
pinMode (LED_PIN, OUTPUT);
// Connect to WiFi
mqtt . beginWiFi ( "YourSSID" , "YourPassword" );
// Configure MQTT
mqtt . beginMQTT ( "broker.hivemq.com" , 1883 );
mqtt . setClientId ( "ESP32_Device_001" );
mqtt . setCallback (onMessage);
// Connect to broker
if ( mqtt . connect ()) {
Serial . println ( "MQTT Connected!" );
// Subscribe to topics
mqtt . subscribe ( "device/command" );
mqtt . subscribe ( "device/config" );
}
}
void loop () {
mqtt . loop (); // Process MQTT messages
// Publish sensor data every 5 seconds
static unsigned long lastPublish = 0 ;
if ( millis () - lastPublish >= 5000 ) {
lastPublish = millis ();
float temperature = readTemperature ();
String payload = String (temperature, 2 );
mqtt . publish ( "device/temperature" , payload . c_str ());
}
}
WiFi Management
Basic WiFi Connection
// Connect with 20-second timeout
if ( mqtt . beginWiFi ( "SSID" , "password" , 20000 )) {
Serial . println ( "WiFi connected" );
Serial . println ( mqtt . getLocalIP ());
}
WiFi Status Checking
void loop () {
if ( ! mqtt . isWiFiConnected ()) {
Serial . println ( "WiFi disconnected!" );
// loop() will automatically try to reconnect
}
mqtt . loop ();
}
External WiFi Management
If you manage WiFi separately:
// Don't use mqtt.beginWiFi()
// Just configure MQTT
mqtt . beginMQTT ( "broker.hivemq.com" , 1883 );
// MQTTManager will still check WiFi status
MQTT Configuration
Broker Setup
// Public broker (no authentication)
mqtt . beginMQTT ( "broker.hivemq.com" , 1883 );
// Private broker with authentication
mqtt . beginMQTT (
"mqtt.example.com" ,
1883 ,
"username" ,
"password"
);
Client ID
// Set custom client ID
mqtt . setClientId ( "MyDevice_001" );
// Generate unique ID from MAC address
String clientId = "ESP32_" ;
#ifdef ESP32
clientId += String (( uint32_t )( ESP . getEfuseMac () & 0x ffffff ), HEX);
#elif defined (ESP8266)
clientId += String ( ESP . getChipId (), HEX);
#endif
mqtt . setClientId ( clientId . c_str ());
Connection Options
// Set keep-alive interval (default 15s)
mqtt . setKeepAlive ( 30 ); // 30 seconds
// Set socket timeout (default 15s)
mqtt . setSocketTimeout ( 20 ); // 20 seconds
// Increase buffer size for larger messages (default 256 bytes)
mqtt . setBufferSize ( 1024 ); // 1KB buffer
// Enable/disable auto-reconnection
mqtt . enableAutoReconnect ( true ); // Default: true
Publishing
Basic Publishing
// Publish string
mqtt . publish ( "sensor/temperature" , "25.5" );
// Publish with retained flag
mqtt . publish ( "device/status" , "online" , true );
// Publish binary data
uint8_t data[] = { 0x 01 , 0x 02 , 0x 03 };
mqtt . publish ( "device/data" , data, sizeof (data));
Topic Helpers
// Publish to hierarchical topic
mqtt . publishToTopic (
"devices/esp32" , // Base topic
"temperature" , // Key
"25.5" // Value
);
// Publishes to: devices/esp32/temperature
JSON Publishing
#include <ArduinoJson.h>
StaticJsonDocument < 200 > doc;
doc [ "temperature" ] = 25.5 ;
doc [ "humidity" ] = 60 ;
doc [ "timestamp" ] = millis ();
// Publish JSON document
mqtt . publishJson ( "sensor/data" , doc);
// Publishes: {"temperature":25.5,"humidity":60,"timestamp":12345}
// With retained flag
mqtt . publishJson ( "sensor/data" , doc, true );
Streamed Publishing (Large Messages)
// Begin publish stream
if ( mqtt . beginPublish ( "device/log" , 1024 , false )) {
// Write data in chunks
mqtt . write (( uint8_t * ) "Long message part 1..." , 23 );
mqtt . write (( uint8_t * ) "Long message part 2..." , 23 );
// Complete publish
if ( mqtt . endPublish ()) {
Serial . println ( "Streamed publish successful" );
}
}
Clear Retained Messages
// Remove retained message from topic
mqtt . clearRetained ( "device/status" );
Subscribing
Basic Subscription
// Subscribe with QoS 0 (default)
mqtt . subscribe ( "device/command" );
// Subscribe with QoS 1 (at least once delivery)
mqtt . subscribe ( "device/config" , 1 );
Unsubscribe
mqtt . unsubscribe ( "device/command" );
Wildcard Subscriptions
// Subscribe to all sensors
mqtt . subscribe ( "sensors/+" ); // Single level
// Subscribe to entire tree
mqtt . subscribe ( "devices/#" ); // Multi level
Message Callback
void messageCallback ( char* topic , byte * payload , unsigned int length ) {
Serial . print ( "Topic: " );
Serial . println (topic);
// Parse JSON payload
StaticJsonDocument < 200 > doc;
DeserializationError error = deserializeJson (doc, payload, length);
if ( ! error) {
float temp = doc [ "temperature" ];
int humidity = doc [ "humidity" ];
Serial . println ( "Temp: " + String (temp) + "C" );
Serial . println ( "Humidity: " + String (humidity) + "%" );
}
}
void setup () {
mqtt . setCallback (messageCallback);
}
Last-Will-and-Testament (LWT)
Basic LWT
// Set will message before connecting
mqtt . setWill (
"device/status" , // Topic
"offline" , // Message
true , // Retained
1 // QoS
);
mqtt . connect ();
Connect with Status
// Set LWT and publish online status in one call
mqtt . connectWithStatus (
"device/status" , // Status topic
"online" , // Online message
"offline" // Offline message (LWT)
);
// When device disconnects unexpectedly,
// broker publishes "offline" automatically
Automatic Status Messages
// Enable status message management
mqtt . enableStatusMessages (
"device/esp32/status" ,
"online" ,
"offline"
);
// MQTTManager will:
// 1. Set LWT to "offline"
// 2. Publish "online" on connection
// 3. Monitor connection and update status
void loop () {
mqtt . loop (); // Automatically manages status
}
// Disable status messages
mqtt . disableStatusMessages ();
Connection Management
Manual Connection
if ( mqtt . connect ()) {
Serial . println ( "Connected to MQTT broker" );
} else {
Serial . print ( "Failed: " );
Serial . println ( mqtt . getStateString ());
}
Auto-Reconnection
void loop () {
// Automatically reconnects every 5 seconds if disconnected
mqtt . loop ();
}
// Change reconnection interval
void setup () {
// Reconnect attempt every 10 seconds
mqtt . enableAutoReconnect ( true );
}
void loop () {
// Pass custom interval to reconnect()
if ( ! mqtt . isConnected ()) {
mqtt . reconnect ( 10000 ); // 10 second interval
}
mqtt . loop ();
}
Manual Reconnection
if ( ! mqtt . isConnected ()) {
Serial . println ( "Attempting reconnection..." );
if ( mqtt . reconnect ( 5000 )) { // Try every 5 seconds
Serial . println ( "Reconnected!" );
}
}
Disconnect
// Gracefully disconnect from broker
mqtt . disconnect ();
Connection Status
if ( mqtt . isConnected ()) {
Serial . println ( "MQTT is connected" );
}
// Get state code
int state = mqtt . state ();
// MQTT_CONNECTED = 0
// MQTT_CONNECTION_TIMEOUT = -4
// MQTT_CONNECTION_LOST = -3
// MQTT_CONNECT_FAILED = -2
// MQTT_DISCONNECTED = -1
// etc.
// Get human-readable state
Serial . println ( mqtt . getStateString ());
// Prints: "Connected", "Disconnected", "Connection timeout", etc.
Advanced Features
Subscription Persistence
MQTTManager automatically resubscribes to topics after reconnection:
void setup () {
mqtt . beginMQTT ( "broker.hivemq.com" , 1883 );
mqtt . connect ();
// Subscribe to topics
mqtt . subscribe ( "device/command" );
mqtt . subscribe ( "sensor/config" );
mqtt . subscribe ( "alerts/#" );
}
void loop () {
mqtt . loop ();
// If connection is lost and restored,
// all 3 subscriptions are automatically restored
}
Access Underlying Client
For advanced PubSubClient features:
PubSubClient & client = mqtt . getClient ();
// Use PubSubClient methods directly
client . setServer ( "broker.hivemq.com" , 1883 );
client . publish ( "topic" , "message" );
Buffer Size Management
// Check current buffer size
Serial . println ( mqtt . getBufferSize ());
// Increase for large messages
if ( mqtt . setBufferSize ( 2048 )) {
Serial . println ( "Buffer increased to 2KB" );
} else {
Serial . println ( "Failed to allocate buffer" );
}
Practical Examples
Sensor Data Publisher
#include <WiFi.h>
#include <ArduinoJson.h>
#define ENABLE_MODULE_MQTT_MANAGER
#include "Kinematrix.h"
MQTTManager mqtt;
const char * SENSOR_TOPIC = "home/sensors/living_room" ;
void setup () {
Serial . begin ( 115200 );
mqtt . beginWiFi ( "SSID" , "password" );
mqtt . beginMQTT ( "broker.hivemq.com" , 1883 );
mqtt . setClientId ( "LivingRoomSensor" );
mqtt . connectWithStatus (
"home/sensors/living_room/status" ,
"online" ,
"offline"
);
}
void loop () {
mqtt . loop ();
static unsigned long lastPublish = 0 ;
if ( millis () - lastPublish >= 30000 ) { // Every 30 seconds
lastPublish = millis ();
// Read sensors
float temp = readTemperature ();
float humidity = readHumidity ();
// Create JSON
StaticJsonDocument < 200 > doc;
doc [ "temperature" ] = temp;
doc [ "humidity" ] = humidity;
doc [ "timestamp" ] = millis () / 1000 ;
// Publish
mqtt . publishJson (SENSOR_TOPIC, doc);
Serial . println ( "Data published" );
}
}
Command Receiver
MQTTManager mqtt;
bool ledState = false ;
void onCommand ( char* topic , byte * payload , unsigned int length ) {
StaticJsonDocument < 200 > doc;
deserializeJson (doc, payload, length);
const char * cmd = doc [ "command" ];
if ( strcmp (cmd, "LED_ON" ) == 0 ) {
ledState = true ;
digitalWrite (LED_PIN, HIGH);
mqtt . publish ( "device/led/state" , "ON" , true );
}
else if ( strcmp (cmd, "LED_OFF" ) == 0 ) {
ledState = false ;
digitalWrite (LED_PIN, LOW);
mqtt . publish ( "device/led/state" , "OFF" , true );
}
else if ( strcmp (cmd, "STATUS" ) == 0 ) {
// Report status
StaticJsonDocument < 200 > status;
status [ "led" ] = ledState;
status [ "uptime" ] = millis () / 1000 ;
mqtt . publishJson ( "device/status" , status);
}
}
void setup () {
pinMode (LED_PIN, OUTPUT);
mqtt . beginWiFi ( "SSID" , "password" );
mqtt . beginMQTT ( "broker.hivemq.com" , 1883 );
mqtt . setCallback (onCommand);
if ( mqtt . connect ()) {
mqtt . subscribe ( "device/command" );
}
}
void loop () {
mqtt . loop ();
}
Multi-Sensor Hub
MQTTManager mqtt;
const char * BASE_TOPIC = "home/sensors" ;
void setup () {
Serial . begin ( 115200 );
mqtt . beginWiFi ( "SSID" , "password" );
mqtt . beginMQTT ( "mqtt.example.com" , 1883 , "user" , "pass" );
mqtt . setClientId ( "SensorHub_001" );
mqtt . setBufferSize ( 512 ); // Larger buffer for multiple sensors
mqtt . connectWithStatus (
"home/sensors/hub/status" ,
"online" ,
"offline"
);
}
void loop () {
mqtt . loop ();
static unsigned long lastRead = 0 ;
if ( millis () - lastRead >= 10000 ) {
lastRead = millis ();
// Publish multiple sensor readings
mqtt . publishToTopic (BASE_TOPIC, "temperature" , String ( readTemp ()). c_str ());
mqtt . publishToTopic (BASE_TOPIC, "humidity" , String ( readHumidity ()). c_str ());
mqtt . publishToTopic (BASE_TOPIC, "pressure" , String ( readPressure ()). c_str ());
mqtt . publishToTopic (BASE_TOPIC, "light" , String ( readLight ()). c_str ());
// Or publish as single JSON
StaticJsonDocument < 300 > doc;
doc [ "temp" ] = readTemp ();
doc [ "humidity" ] = readHumidity ();
doc [ "pressure" ] = readPressure ();
doc [ "light" ] = readLight ();
mqtt . publishJson ( "home/sensors/all" , doc);
}
}
Best Practices
Keep loop() fast. MQTT requires regular processing. Avoid long delays in loop().
Use QoS wisely:
QoS 0: Fire and forget (fast, unreliable)
QoS 1: At least once (reliable, possible duplicates)
QoS 2: Exactly once (slowest, guaranteed)
Topic naming conventions:
Use hierarchical structure: building/floor/room/device/sensor
Keep topics short but descriptive
Use lowercase
Avoid spaces and special characters
Troubleshooting
Connection Fails
if ( ! mqtt . connect ()) {
int state = mqtt . state ();
Serial . print ( "Connection failed: " );
Serial . println ( mqtt . getStateString ());
switch (state) {
case - 4 : // MQTT_CONNECTION_TIMEOUT
Serial . println ( "Check broker address and port" );
break ;
case - 3 : // MQTT_CONNECTION_LOST
Serial . println ( "Connection lost, will retry" );
break ;
case - 2 : // MQTT_CONNECT_FAILED
Serial . println ( "Network issue" );
break ;
case 2 : // MQTT_CONNECT_BAD_CLIENT_ID
Serial . println ( "Change client ID" );
break ;
case 4 : // MQTT_CONNECT_BAD_CREDENTIALS
Serial . println ( "Check username/password" );
break ;
case 5 : // MQTT_CONNECT_UNAUTHORIZED
Serial . println ( "Check permissions" );
break ;
}
}
Messages Not Received
// Ensure callback is set BEFORE connecting
mqtt . setCallback (messageCallback);
mqtt . connect ();
mqtt . subscribe ( "topic" );
// Ensure loop() is called regularly
void loop () {
mqtt . loop (); // REQUIRED
}
Buffer Overflow
// Increase buffer size for large messages
mqtt . setBufferSize ( 1024 ); // Default is 256
// Or use streamed publishing
mqtt . beginPublish ( "topic" , messageLength, false );
mqtt . write (data, length);
mqtt . endPublish ();
API Reference
beginWiFi
bool(ssid, password, timeout)
Connect to WiFi network. Returns true if successful. Default timeout 20s.
Check WiFi connection status.
beginMQTT
bool(server, port, username, password)
Configure MQTT broker. Username and password are optional.
Set MQTT client identifier.
Set message callback function.
setWill
void(topic, message, retained, qos)
Configure Last-Will-and-Testament message.
Connect to MQTT broker. Returns true if successful.
Attempt reconnection with specified interval (default 5000ms).
Check MQTT connection status.
Get connection state code.
Get human-readable state description.
publish
bool(topic, payload, retained)
Publish message. Returns true if successful.
publishJson
bool(topic, doc, retained)
Publish JSON document.
Subscribe to topic with QoS (default 0).
Process MQTT messages. Must be called regularly.
Set message buffer size in bytes. Returns false if allocation fails.
Enable/disable automatic reconnection.
Next Steps
Cloud Integrations Connect to Google Sheets, Telegram, and WhatsApp
Firebase Integration Store MQTT data in Firebase for persistence