Skip to content

Dealing with offline consumers without data loss using MQTT persistent sessions – MQTT Series #4

Hi and welcome to the 4th article on the MQTT series. Today we’ll see how to deal with fragile connections and how consumers can survive transient connectivity losses without data loss.

We’ll use the clean_session flag to let the broker know that we’re not looking at a clean session (I know, it’s a bit misleading). We’ll also see how to use the session expiry flag to define for how long the broker shall keep messages for the offline consumer.

The Clean session flag

Before entering the details of this will work, it’s important to understand what the clean session flag does.

All consumers have an identifier (client ID), which uniquely identifies a client before the broker. When a client does not explicitly define its client identifier, the broker will assign one, so the client can be identified.

For the client to have the ability to receive messages sent to the broker while being offline, a deterministic client identifier needs to be set.

This, in combination with setting the clean_session to false will tell the broker that messages sent to whatever topic this client has subscribed shall be kept until the successful delivery to the said client.

Note that by default this flag is set to true, so the broker treats every connection from this client as a new session, and does not keep non-delivered messages.

As such, when desired behaviour is to receive messages sent while the consumer was offline, the clean session flag must be explicitly set to false.

The importance of QoS

Messages sent with QoS 0 are not stored in the broker, even if one or more consumers have set clean_session to false.

Messages need to be sent with QoS 1 or QoS 2 to leverage the power of persistent sessions.

For more information on the different QoS levels, check the first article of this series: MQTT – the defacto IoT protocol

The session expiry configuration

Different brokers will have different configurations or behaviours, namely whether messages are just stored in memory of they are persisted to disk while waiting for delivery.

For instance, Mosquitto will, by default, only hold the messages in memory, but disk persistence can be enabled if needed. This would make the broker more reliable, with non-delivered messages being able to survive a broker restart, at the expense of lower throughput, due to disk latency.

Something that is common for all brokers, as it is part of the MQTT protocol, is the definition of how long the broker needs to keep undelivered messages for a client.

Or, in other words, how long shall the broker remember that a client with a specific identifier has subscribed to a set of topics, for which messages need to be stored until delivery.

The session expiry configuration exists for this exact purpose.

This prevents situations where a client never connects again, and the broker stays with undelivered messages forever.

Putting it all together

To demonstrate how everything fits together we’ll use a simple python producer and consumer, using python-paho, pointing to a local Mosquitto instance.

The producer:

import time

import paho.mqtt.client as mqtt

# MQTT broker details
broker_address = "localhost"
broker_port = 1883
topic = "my_topic"

# Connect to the MQTT broker
client = mqtt.Client()
client.connect(broker_address, broker_port)

# Initialize the value
value = 0

# Send a message every second
while True:
    # Increment the value
    value += 1

    # Publish the message
    client.publish(topic, str(value), qos=1)
    print("Sent: " + str(value))

    # Wait for 1 second
    time.sleep(1)

As the code shows, this producer connects to the local broker and produces a message every second, incrementing the message value.

It’s worth noting that on line 23, the QoS is set to 1.

The Consumer:

import paho.mqtt.client as mqtt

# Define the MQTT broker details
broker_address = "localhost"
broker_port = 1883

# Define the client ID and session persistence
client_id = "persistent-session-consumer"
session_expiry = 3600  # Session expiry time in seconds

# Create a new MQTT client instance
client = mqtt.Client(client_id=client_id, clean_session=False)

# Set the session expiry interval
client.connect(broker_address, broker_port, keepalive=session_expiry)

# Define the callback functions for MQTT events
def on_connect(client, userdata, flags, rc):
    print("Connected to MQTT broker with result code: " + str(rc))
    # Subscribe to the desired topics
    client.subscribe("my_topic", qos=1)

def on_message(client, userdata, msg):
    print("Received message: " + msg.payload.decode())


# Set the callback functions
client.on_connect = on_connect
client.on_message = on_message

# Start the MQTT client loop
print("Starting the consumer... Press Ctrl+C to exit.")
client.loop_forever()

The consumer code, although a bit longer, is also quite simple.

It’s worth noting lines 8 and 9, defining the client ID and session expiry time, respectively. These are used when connecting to the client on line 15, and when the client is created on line 12.

Line 12 also specifies that this client shall not use a clean session.

After connecting to the broker, we can subscribe to the topic, using also QoS 1 (line 21).

Then, for each message received, we’ll simply print it.

Demo

In the following demo we’ll start the producer.  We’ll let the producer send a few messages and then we’ll start the consumer.

Notice how the first messages were not picked up by the consumer, as they were sent while it was offline, and the broker had no notion of such a consumer yet.

Then we’ll start seeing published messages appearing in the consumer. After a while we’ll stop it. Note the last received message.

Let the producer publish a few more and restart the consumer. Notice how the first messages are delivered instantly, and the first received message is the one exactly after the last consumed message before we shut down the consumer.

Wrap-up

This article focused on a very common use case while employing MQTT: dealing with brittle consumer connections without data loss.

This doesn’t solve offline producers, with no connectivity to the broker. For these, local caching or MQTT bridges would be required.

I hope this has shed some light on clean and persistent MQTT sessions and how to implement them.

Cheers

Published inIoT

Be First to Comment

Leave a Reply