HiveMQ Increases MQTT Per-Core Throughput With the New Client Queue
With the latest HiveMQ Platform 4.28 LTS release, HiveMQ Broker increases efficiency and consistency of storing messages in client queues by introducing a new schema for addressing PUBLISH messages in client queues and migrates client queues to the RocksDB storage engine. HiveMQ Broker achieved 3900 messages per second per core of total MQTT throughput and demonstrated reduced CPU usage in the cluster merge scenario for 30 million queued messages by nearly 20%. In this blog post we dive into the motivation and the performance implications of this major move for the users of HiveMQ Platform.
Data Durability and Ordering as MQTT Must-Haves
The requirements to data transmission differ substantially across industries and even between the use cases within an industry. When collecting telemetry from the workstations on the shop floor for offline analysis, one could cope with missing a few updates; and even a somewhat wrangled order might not be an issue. It is a whole lot different story though, if the car lock and unlock events are swapped or one of these events is lost.
Given the diversity of the use cases relying on MQTT, HiveMQ Broker was designed with considerations for data durability and consistent ordering of messages. The latter is a notoriously challenging engineering task given that HiveMQ Broker natively supports data replication. Data durability, in contrast, is a concern for every server on its own, which, however, contributes greatly to the overall broker reliability.
Data durability in computer systems is achieved by storing data on non-volatile storage mediums, that is, storages that keep the data even in case of a power outage. A common example of non-volatile storage is a hard disk drive (HDD) and a solid-state drive (SSD). Even though the application data is only temporarily stored in an MQTT broker (in contrast to the database systems), persisting it to a non-volatile storage is the only known way to ensure the true quality of service guarantees provided by the MQTT protocol, despite the infrastructure failures that inevitably happen.
In contrast to non-volatile storage, the main memory of the server (RAM) is prone to losing its content upon even the shortest power disruption. So, if an MQTT broker relies only on the main memory for storing the data, then it cannot ensure the true quality of service when, for example, exactly once delivery is required. In case of a power failure, such a broker will start with a blank slate effectively invalidating the acknowledgements that it provided to the clients.
Therefore, when making a choice between the quick unreliable messaging for QoS 1 and QoS 2 levels and a little less quick (but still in the ballpark of milliseconds during normal operation) but reliable QoS 1 and QoS 2 messaging, HiveMQ selected the latter. This choice came with a catch: the way that the data can be effectively laid out in memory differs from how it can be effectively laid out on the non-volatile storage. Hence HiveMQ engineers have to pay special attention to the way the data, specifically, the messages, are stored in the broker. The way the broker stores messages is called a schema.
When devising a schema, one needs to be extremely clear about the operations that are performed on the data and the volume of these operations. The broker acts as a transient (temporary) storage for the data that is passed from publishers to subscribers. Based on the MQTT specification, applications expect that the discrete pieces of data (messages) will arrive in a specific order or as close to this order as possible regardless of the broker implementation. To guarantee the order, HiveMQ Broker stores messages in queues.
MQTT Client Queues
A notion of queue can be grasped intuitively: the first message that is added to the queue will be the first message extracted from it. This is called a “first in, first out” (FIFO) discipline. The operations that a typical MQTT broker has to perform on the queue include adding a single message at the end of the queue, reading a batch of messages at the front of the queue, changing a status of the message in the queue (for example, if it is in-flight or expired), replacing a message with an acknowledgement, and removing a single message or a batch of messages from the queue once they are delivered or expired. These operations on the client queue determine the way in which it would best be to lay out the messages on the storage medium. Some of these operations, like adding a message to the queue, are paced by the external load that is generated by the publishers[1]; other operations, like reading a batch of consecutive messages from the queue, can be paced by the internal algorithms given that subscribers stay connected to the broker.
In HiveMQ Broker, every non-shared subscription of a client is represented as an isolated queue. This means that the client queue and operations on it are isolated from other client queues from the functional point of view. The queues on the same server, however, still share its resources, most notably, CPU and disk. A shared subscription is also represented by a single queue in HiveMQ Broker. In contrast to non-shared subscriptions, this queue can be read from by many clients forming a group of shared subscribers. For scalability reasons, all shared queues are sharded in HiveMQ Broker. For simplicity, we will call the queues for the shared as well as for the non-shared subscriptions client queues.
From the higher-level perspective, we see two kinds of operations on the client queue: point access and iterative access.
Point access means accessing individual messages regardless of their position in the queue. Such operations are required, for example, if a specific message has been delivered to the subscriber and it can be removed from the queue. Since the specific message for such operation is known, an index that provides fast access to individual messages based on their identifiers, would accelerate such operations. Such kind of accesses are usually achieved with various tree structures which most storage engines use under the hood. The message identifier then serves the purpose of traversing such a tree to find the exact message with the least possible amount of hops through the tree. Hence, an ideal identifier for point accesses should ensure that the tree is balanced and has a short search path from its root to the messages (also called height or depth).
Iterative access means going over messages one by one in order. This kind of access in the queue is required when there are multiple messages that can be read in one go and delivered to the subscriber. In that case it would be very convenient to have all these messages to be close together on a disk so that the disk time is not wasted on accessing multiple locations that are spread across its plates. A schema that realizes the benefits of data locality results in substantial performance improvements, especially for non-volatile storage, whose access times are considered to be the highest in computer systems (next only to network latencies).
Improvements to the Client Queues Schema
Given the very dynamic nature of MQTT workloads (in a sense that the client data usually does not stay long in the broker), the updates to the status of each message are a rule rather than an exception. The messages in the queue can be idle, that is, their delivery to the subscriber has not yet begun, or in-flight if their delivery is in progress, or expired, if the message was not delivered in the due time.
HiveMQ Broker treats in-flight and idle messages differently, according to the internal logic of e.g. handling client reconnects. Thus, splitting the queued messages according to their status is beneficial to ensure data locality that, in turn, increases the broker performance by avoiding fragmentation in the queue containing a mix of idle and in-flight messages. This is particularly important for shared subscription queues that are more prone to fragmentation due to multiple subscribers reading from the same queue.
Preserving the order of client messages is not complicated when the broker runs on a single node. However, being a highly-available system, HiveMQ broker replicates client data, including the client queues[2]. Therefore, the order of messages in the client queues has to be globally consistent, that is, every HiveMQ instance has to see the messages appearing in the same order. This is possible if the message identifiers are unique enough. But how unique should they be?
We can first take a look at the packet identifiers that are part of the MQTT protocol itself. Could these identifiers provide sufficient uniqueness to avoid losing messages? Let’s consider an example to see why packet identifiers of MQTT are a bad choice for ensuring uniqueness in a distributed MQTT broker.
As a highly-available system, HiveMQ Broker has to ensure that the data transmission continues regardless of network disruptions between the HiveMQ instances. Thus, if a network disruption splits the cluster with HiveMQ instances not being able to communicate, these groups of instances will behave as standalone clusters and continue their operations. Specifically, each group will independently accept messages for a client and will independently assign packet identifiers to these packets upon sending them out.
In case of such a split, it becomes possible that two PUBLISH packets with different payloads end up with the same packet identifier. This may happen if a message is polled from the queue for the subscriber, which reconnects to a different partition and polls a different message, that, as a result, gets the same packet identifier. As soon as the network disruption is healed and the broker instances attempt to merge into a bigger cluster, they will have to come up with a single source of truth for what messages the client queue should contain and in which order. If they do so based on the packet identifiers alone, then one of the messages will be erroneously ‘deduplicated’, that is, deleted. The unique payload will then be lost.
An obvious alternative of comparing messages payload by payload is highly inefficient. Such an approach will strain the resources and will result in healing taking very long depending on how many messages got accumulated in the queues.
Being aware of these issues, the design of HiveMQ Broker from the onset uses cluster-wide unique message identifiers that are bound to the unique server id. Up until recently, though, these unique identifiers were part of the internal PUBLISH message representation and were not used for accessing the messages in the client queues, which resulted in inefficient resource usage in case of cluster healing after a network disruption.
With the HiveMQ 4.28 LTS release, both the status of the message (idle, in-flight, expired) and the cluster-wide unique identifier of the message are made part of the client queue schema. Such a change to the schema allows to both effectively address individual messages in the client queue as well as to read them in bulk. Thanks to making the status of the PUBLISH message part of the queue identifier, HiveMQ Broker is able to avoid queue fragmentation.
Expanding the Use of RocksDB to the Client Queues
Another major enhancement to the performance of HiveMQ on the critical path of message delivery is in switching to RocksDB as the embedded storage engine for client queues.
RocksDB is a key-value store that can be embedded in applications as a library. RocksDB borrows some code from a famous LevelDB project designed and implemented by Google. The core objective behind RocksDB is to ensure high performance for storage and server workloads providing efficient point lookups and range scans. RocksDB is an open-source library developed and maintained primarily by Meta (Facebook) software engineers. Meta team maintaining RocksDB puts emphasis on supporting backwards compatibility of RocksDB to avoid breaking applications relying on this library[3].
For multiple years, HiveMQ Broker was using RocksDB to efficiently store message payloads. Message payloads in the HiveMQ broker are accessed via point lookups. In contrast, the metadata of messages that was used for ordering and delivery in the client queue was stored in Xodus that, until recently, showed superior performance on iterative access patterns, like range scans.
The major switch to RocksDB as a unified storage engine for both the queue of messages metadata and the message payloads has a number of reasons, including better maintenance of the project (backed by Meta), unification of the code base of the broker for future performance enhancements, and continuous performance and functionality improvements that are added to the library. In addition, HiveMQ introduces in-memory iterator bounds for the new client queue schema to ensure high performance of iterative accesses to the client queues.
One of the key design choices made in RocksDB implementation is that it stores so-called memtables in the main memory of the server. This in-memory data structure contains the latest updates to the store that occurred since the last flush to the disk. Application crash-consistency is guaranteed by the on-disk write-ahead log (WAL) which is enabled by default in RocksDB. RocksDB improves efficiency of reads by caching both the data blocks and the auxiliary blocks (index, filter, and compression dictionary) in main memory[4]. HiveMQ Broker uses this RocksDB functionality to achieve higher throughput.
Squeezing the Most out of the CPUs
Normal Operation
Combining the new client queue schema with the switch to RocksDB resulted in improving the throughput that HiveMQ Broker provides per processor core. Such efficiency metrics are extremely relevant for the use cases with the cost efficiency of the data infrastructure, e.g. in cloud, being a concern.
With recent changes, HiveMQ Broker achieved 3900 messages per second per core of total MQTT throughput (inbound and outbound) for QoS 1 messages persisted on disk, which is 8% higher than the throughput per core supported by the previous version of Broker. This improvement allows to cut the cost of the deployment or to grow the use case with the same infrastructure. More than that, the changes that led to this improvement pave the path for more substantial performance gains in the upcoming releases of HiveMQ Platform.
For our benchmark we have used four c7a.8xlarge AWS instances with 32 virtual cores each. The scenario is a typical 1 publisher to 1 subscriber with 100.000 publishers and 100.000 subscribers connected to the broker, and every publisher sent messages as fast as the broker could accept them, totaling in 15 million messages per minute of incoming MQTT traffic, each message of 16 bytes in size. The load was generated by the enterprise-grade HiveMQ Swarm distributed load-testing tool. The below graphs from Grafana show HiveMQ cluster consistently fully utilizing all the CPUs while maintaining 30 million messages per minute of total MQTT throughput.
Figure 1. Resource usage and the throughput graphs highlighting the improved performance of the HiveMQ Broker with new client queue implementation.
Cluster Merge
Network disruptions that occasionally happen even in hyperscalers may lead to the HiveMQ Broker cluster being split into multiple partitions with HiveMQ instances not being able to reach each other. Once the disruption heals, the cluster restores itself and performs data merge, including merging the client queues. In shared-nothing stateful systems ensuring high availability, such as HiveMQ Broker, merging cannot happen instantaneously because queued messages have to be deduplicated, which means iterating the queues and comparing message identifiers.
The schema improvements and the migration to RocksDB for the client queue resulted in notable CPU usage reduction of approximately 15-20% for the scenario with 30 million queued messages (see Figure 2 below). This means additional cluster resources available to maintain high throughput even during such disruptive events as cluster healing. In addition, the improvements pushed the time required to merge the cluster of 4 nodes with 30 million queued messages by nearly 10%, down to 21 seconds.
Figure 2. Left: CPU usage and queue size on cluster merge prior to client queue improvements. Right: CPU usage and queue size on cluster merge after client queue improvements.
Note on Using Xodus with the New Schema
Staying true to supporting business continuity of the use cases that rely on the HiveMQ broker, the HiveMQ team went an extra mile by adding an Xodus fallback for the customers that cannot use RocksDB in their deployments (for example, because RocksDB dependency on glibc cannot be satisfied). These customers could still benefit from the new client queue schema since the broker provides a fallback support of the new schema over Xodus which, although does not give the same performance benefits, still keeps benefits of improved consistency of the client queues merge.
The full potential of the improvements highlighted in this blog can, however, only be achieved from the synergy of the new schema with RocksDB. Therefore, we highly recommend our customers to opt for the combination of the new schema and RocksDB that are now part of the HiveMQ’s long-term support.
The switch to new schema and RocksDB for the client queues is only the first step to enable substantial HiveMQ Broker performance and efficiency improvements. Stay tuned to learn firsthand about new heights that HiveMQ Broker takes to extract the most from the infrastructure that it runs on!
References
[1] This statement is true only to some extent: the clients may wait for acknowledgements or the broker may implement some form of overload protection that stops accepting new messages or reduces the pace of accepting the messages.
[2] To understand how replication helps in ensuring high availability of HiveMQ, read the blogpost: https://www.hivemq.com/blog/hivemq-high-availability-through-replication-failover/
[3] Read more about the high-level design of RocksDB: https://github.com/facebook/rocksdb/wiki/RocksDB-Overview
Dr. Vladimir Podolskiy
Dr. Vladimir Podolskiy is a Senior Distributed Systems Specialist at HiveMQ. He is a software professional who possesses practical experience in designing dependable distributed software systems requiring high availability and applying machine intelligence to make software autonomous. He enjoys writing about MQTT, automation, data analytics, distributed systems, and software engineering.