Skip to content

Implementing the Worker Pattern / Competing Consumers Pattern with MQTT shared subscriptions – MQTT Series #3

Hi and welcome to the 3rd article of the MQTT Series. In this article we’ll start looking into more advanced, yet very commonly found scenarios in IoT workloads. We’ll discuss the scalability and availability of consumers and we’ll see different ways to implement a load balanced consumption layer, namely by implementing Worker Pattern / Competing Consumers Pattern with MQTT shared subscriptions.

Scalability beyond the broker

As we saw in previous articles, a single MQTT broker instance can handle thousands of messages in parallel. But what happens to a system’s ability to deal with such an influx of data if processing a single message takes some time? What happens if consumption speed does not match publishing speed?

Most likely, messages will queue, and in the best case there will be processing delay, and in the worst case, data loss.

There are different strategies to deal with this, while minimizing or fully eradicating data loss.

Topic Path as a partitioning scheme

MQTT provides multiple ways to deal with this, namely using different topic patterns to distribute the load among multiple consumers.

An example of such a scenario would be processing of temperatures sent by thousands of devices. These devices could be segregated into groups or partitions, and would publish to a topic path using the group or “partition”.

Although not technically a partition, for the sake of this example, we’ll refer to this part of the topic path as partition.

For instance, using 6 partitions, each device would be configured to publish their temperature to one of the following topics

  • /device/<device_id>/A/temperature
  • /device/<device_id>/B/temperature
  • /device/<device_id>/C/temperature
  • /device/<device_id>/D/temperature
  • /device/<device_id>/E/temperature
  • /device/<device_id>/F/temperature

Then, using subscription wildcards, each of the 6 consumers would subscribe to one or more “partitions”.

If a partition (or any topic really) is subscribed by more than one consumer, only one consumer will be able to receive data, and other subscribers will “starve”.

This approach, although simple enough, has a few caveats:

  • The number of parallel subscribers is limited to the number of partitions.
  • The configuration of each publishing device, in terms of their partition affinity may not be simple or possible at all
  • Changing the number of partitions requires configuration changes across the entire device family.
  • If one of the subscriber instances stops working, messages from that partition will no longer be processed.

Implementing the worker pattern using MQTT Shared subscriptions

A cleaner way to achieve the distribution of the workload, by having several subscriber instances, would be to implement what is commonly called as the Worker Pattern, or Competing Consumers Pattern.

In this pattern, multiple consumers/subscribers are created to receive messages from a single channel/topic. When a message is delivered, any of the consumers could potentially receive it, in a round-robin fashion. This allows for concurrent processing of multiple messages, optimizing throughput, improving scalability and availability, and balancing the workload.

When a new subscriber is added to the pool, it starts receiving messages and helping its siblings processing the message influx.

In this pattern, each message is delivered to one subscriber only, ensuring it’s not processed in parallel by multiple consumers.

It is, nonetheless, a good practice to make the processing logic idempotent, as a message can be delivered more than once due to specific network split scenarios.

MQTT shared subscriptions is a feature introduced in MQTT 5.0 that enable the implementation of such scenario with MQTT.

In normal subscriptions, every time a message is published, all matching subscribers will receive a copy. This is explained in more detail as we explore the fan-out pattern, later in this article.

With shared subscriptions, however, the MQTT server evenly distributes the message load among clients using a specific subscription. This means that when we have two clients sharing a subscription, each message that matches the subscription will only have one copy delivered to one of the clients.

Shared subscriptions bring not only excellent horizontal scalability to consumers, enabling us to handle higher throughput, but also high availability. Even if one client disconnects or fails, other clients sharing the same subscription can continue to process messages without further intervention or data loss (depending on the QoS level)

The shared subscriptions work with topics that follow a specific naming convention when subscribing: $share/<ShareName>/<TopicFilter>.

Here, $share is a reserved prefix, so the server knows this is a shared subscription topic.

<TopicFilter> is the actual topic (or pattern) we want to subscribe to. The middle <ShareName> part is a string specified by the client, representing the share name used by the current shared subscription. Usually, the <ShareName> field is also referred to as Group Name or Group ID.

It is recommended that the same QoS level is used across subscribers that are part of the same group, for consistency and easiness of troubleshooting.

Let’s see this in action. We’ll use a simple Mosquitto broker with no SSL/TLS and no security, for the sake of this demo’s simplicity. The docker-compose file can be found here.

We’ll then use the mosquitto_sub and mosquitto_pub tools we saw earlier in this article. We’ll create one publisher, where we’ll send sequential messages, every second, using a bash script.

After, we’ll create 2 subscribers, part of the same group, subscribing to the same topic, one at a time.

While there is only one subscriber, all messages will be received by this subscriber, but when the second one starts, messages will be distributed.

We’ll finally stop the first subscriber, leaving the second subscriber as the only active subscriber, and it will start handling all messages.

The publishing script:

#! /bin/bash
MESSAGE=0
TOPIC="temperature"

for (( ; ; ))
do
   MESSAGE=$((MESSAGE+1))
   mosquitto_pub -t $TOPIC -m $MESSAGE -q 2 -V mqttv5

   echo "Published message $MESSAGE to topic $TOPIC. Waiting 1 second..."

   sleep 2
done

Subscriber one:

mosquitto_sub -t '$share/my_group/temperature' -q 2 -V mqttv5 -d --id subscriber1

Subscriber two:

mosquitto_sub -t '$share/my_group/temperature' -q 2 -V mqttv5 -d --id subscriber2

Notice the different client ID. Using the same client id on the second subscriber would close the first subscriber connection.

Seeing it in action (click to expand):

On the left side we have the producer starting. We then enabled the first subscriber on the top right side. All messages are handled by this subscriber.

Then we enabled the second subscriber at the right bottom side and we start seeing the workload being balanced between the two subscribers.

Lastly, we shut down the first subscriber, and we now see how the second subscriber taking the full workload on its own.

Fan-out approach

The fan-out approach, in messaging systems, is a pattern used to broadcast messages in a one-to-many arrangement.

The delivery (or spreading) of a message is done to one or multiple destinations, possibly in parallel and in a non blocking way (one subscriber shall not impact other subscribers).

By default, each consumer in MQTT gets a copy of the same message. This means that, by default, MQTT implements the fan-out approach.

This is useful for scenarios where the same message needs to be consumed/processed by multiple systems. Each subscriber will just need to declare a different client ID and, by default, each one will get a copy of each message sent to the subscribed topics.

Combining the fan-out approach with the worker pattern

Well, what if, in a scenario where the same message needs to be received by different consumers, we need to increase message consumption performance?

The good news is that we can leverage all the goodness of the shared subscriptions, we saw earlier, in a fan-out way. To do that, we’ll need to create more than one subscription group.

For instance, if we need two subscriber instances processing temperatures for data archival and two other instances processing temperatures for the purpose of alerting users, with data coming from the same topic, the following arrangement can be used:

Consumer IDRoleSubscription filter
Archival-1Archival$share/archival/topic
Archival-2Archival$share/archival/topic
Alerting-1Alerting$share/alerting/topic
Alerting-2Alerting$share/alerting/topic

In this example, a copy of each message will be sent to both subscription groups (archival and alerting), and within each group, only one subscriber will receive each message.

Wrap-up

Today we looked at some interesting and more advanced IoT/messaging use cases and how MQTT can help solving them. Such use cases are frequent in the real world, and can be more complex than the typical hello-world examples of messaging.

We saw how to distribute the consumption load using a “poor-man partitioning strategy”, with its pitfalls. We also saw how to implement the Worker Pattern / Competing Consumers Pattern with MQTT shared subscriptions

Lastly we saw how to combine the fan-out approach with the worker pattern for more complex scenarios.

Hope this has been useful.

Published inBackendIoT

Be First to Comment

Leave a Reply