Boosting MQTT Broker Efficiency With Improved Threading
HiveMQ: Relentless Focus on Efficiency
HiveMQ MQTT broker is the core of the HiveMQ platform. The broker is a highly available and scalable piece of software capable of processing millions of messages per second while ensuring their durability. The HiveMQ platform serves a diverse set of applications: from home automation to supporting fleets of connected cars and geo-distributed networks of smart factories.
The particular challenge of the HiveMQ broker as a horizontal solution is that it has to satisfy diverse and sometimes even contradictory requirements of various industries. Satisfying these requirements frequently means striking an engineering tradeoff between various performance-related characteristics such as throughput, latency, and resource usage. There is, however, a single requirement that HiveMQ never compromises on – reliability of the platform.
Reliability is the bottom line of the HiveMQ platform offering. Every user of the HiveMQ platform assumes that it is a given. And rightfully so. Having enterprise customers such as Audi, BMW and Siemens that continue to power their applications with the HiveMQ platform for as long as the company is in business (almost 12 years) is the only credible identifier of success on this frontier. No long-running test could ever prove this better.
With the continuing growth of the customers’ applications, reliability alone is not enough. Alongside maintaining the highest standards of reliable MQTT communication, HiveMQ continuously invests in improving platform characteristics that matter a lot for various categories of use-cases, including but not limited to: message throughput, end-to-end latency, and security.
For the solution in the data movement space, it matters a lot that it can move as much data as possible as fast as possible. The emphasis on getting throughput up to support the growing use cases is something that continuously drives technical innovation at the core of the HiveMQ platform - HiveMQ MQTT broker. Supporting business continuity of customers requires maintaining high throughput even in fairly infrequent operational scenarios such as network connectivity disruptions and platform upgrades. But even then, this is but a single part of the far bigger technical challenge of increasing the HiveMQ broker efficiency.
In recent years, it became crucial for businesses all around the globe to increase efficiency of operations. What this boils down to in practice is producing more with less. Being a critical link in the value chains across various industries, such as manufacturing and logistics, HiveMQ is dedicated to improving the overall efficiency of the platform, starting with the core MQTT broker and spanning out to the edges (literally, all the way to the HiveMQ Edge). We aim to make operating the HiveMQ platform as cheap as possible while providing a reliable high level of performance for data-intensive applications.
The latest milestone on this thorny path has been achieved in HiveMQ 4.24.0 and 4.9.15 (LTS). By adding another processing ‘station’ for sending the intra-cluster traffic within the clustered broker, HiveMQ was able to push the number of QoS1 messages per core per second from around 1560 to more than 2340 (~50% increase) for a deployment of 4 32-core AWS EC2 instances.
In what follows we will dive into the details of this improvement adopting the metaphor of the production line. The engineering team behind the HiveMQ broker uses it as a mental framework for continuously improving the performance of the broker.
Broker as a Production Line
HiveMQ broker is a complex distributed system rich with functionality to manage limited resources of every server that broker is deployed on. Throughout this blog we will use a metaphor of a production line that would help us shed some light on the optimizations that were introduced in HiveMQ 4.24.0 and 4.9.15 (LTS).
So, what is a production line?
Production lines are housed inside factories. Every line has various workstations (like a drilling or a bending CNC machine) standing beside them and performing corresponding operations on the items that pass from station to station. These workstations and workers that operate them transform the raw materials and semi-ready pieces into the final product.
To visualize how a production line works, one could imagine a steel sheet being laid at the start of the line. The sheet is carried forward through the workstations via the line's rollers. A single operation is performed by every workstation that the steel sheet passes through.
In the ideal world, the processing speed of the workstations would adapt to the speed of parts proceeding through the factory. That is, a part would be processed instantaneously upon leaving the previous workstation. However, operations performed by different machines may wildly differ, especially in terms of complexity, and, as a result, in terms of processing time.
To ensure continuous flow of parts through the production line, one would have to dynamically adapt the number of workstations performing the same operation. However, even that may not be enough if the speed at which materials enter the system (inflow speed) changes. To account for that variation, one would have to introduce buffers to accommodate stacks of parts processed by the previous workstation in the chain that cannot be instantaneously processed by the next one.
At an overly simplified level, every workstation performs a specific operation that transforms the station's input into some modified output (e.g. a solid steel sheet into a steel sheet with a couple of holes in it) and puts the result into a buffer. HiveMQ broker performs its work in a similar way. In the broker, every operation (e.g. putting the message into the queue) corresponds to some execution service. A workstation is analogous to a thread in an execution service[1]. Every execution service has a fixed or dynamically adjustable number of threads (also called a thread pool) that perform specific tasks, for example, writing data to the disk, replicating it, or sending over the network.
Once a task has been processed, its result lands into an in-memory buffer[2] to be picked up later on by another executing part. The only substantial difference to the production line is that the broker employs a first-in-first-out (FIFO) policy when taking the next task to process. Effectively, buffers between operations in the broker become queues. In manufacturing the queueing behavior is not strictly required since all the produced items on the same production line are interchangeable. This is different for the data passing through the broker - every message is unique since it carries unique information (payload).
The below picture represents HiveMQ broker as a system of queues and intermediate processing steps. Sending messages between nodes in the cluster (intra-cluster communication) might be viewed as yet another queue that the sender puts their message into. Receiver node would then perform a retrieval operation from this queue. A system like this is called a queueing system.
The expectations from the production line hold to some extent for the MQTT broker. Specifically, one would want the flow of work through the broker to be continuous and to avoid tasks piling up in queues. The latter would mean that there is not enough processing capacity and thus overall the message delivery gets slower and the throughput falls. Although the room for increasing the throughput has a natural limit because of the requirement to maintain the ordering of messages, there is a method to engineer the whole system of executing parts and queues in such a way that the tasks flow through it as swiftly as possible given limited hardware resources.
Classical management-oriented novel “The Goal” by E. Goldratt gives a nice summary of how such queuing systems can be optimized. The key idea is to identify the bottlenecks (workstations or operations where the work piles up) and to embroider them by adding more workstations or introducing prioritization for different kinds of work. Similar stance is taken by a more solid methodology of production line balancing.
Minimizing work-in-progress (which is what having tasks in a queue means) can be seen as a valid goal for optimizing the computer systems as well. In contrast to factories, computer software and hardware allows relatively swift removal and addition of resources (computing power, memory), so the resolution of bottlenecks might be done on a far smaller timescale as compared to production lines. If work accumulates in some part of the system while the load remains limited, then some available resources might end up underutilized. So, having a straggler in the system reduces its total throughput.
Using the metaphor of the production line, let’s dive into how HiveMQ engineers optimized the task flow of the broker to enable more efficient use of resources on the single server resulting in the increase in the total throughput of the broker for high-end servers.
Optimizing the Broker’s Task Flow
Decoupling Broker Operations for Higher Throughput
To optimize the throughput of a production line, one would try to match the output of an operation to the input of the next operation. This avoids idle time for workstations. There are two basic ways to achieve that: changing the number of workstations per operation or slicing the operation into finer operations.
Conceptually, the flow of tasks in the HiveMQ broker is similar to the flow of parts on the production line. A noteworthy difference between processing the parts on a manufacturing line and processing tasks in the HiveMQ broker is that all the ‘workstations’ on the single HiveMQ server are backed by the shared set of limited resources, e.g. CPU time. What sharing of limited resources introduces is the need to acquire these resources and free them once done. Both actions take time. Therefore, slicing the tasks too fine in the computer system is generally a bad idea since the overhead of acquiring and freeing resources will likely waste CPU time[3].
Using the production line metaphor, let’s see how a task flow within the broker can be optimized to increase the message throughput.
Let us imagine that the cluster contains three nodes: A, B, and C. The work happens on node C. The results of it (persisted messages) are replicated either to node A or to node B. The broker needs to perform the following tasks in this example: write the message to the storage, prepare everything required to replicate the data to some other node, and then actually send this message. Every task is an operation in terms of production line. The sending part of the broker is optimized to bundle together multiple messages if they are sent within a short timeframe to the same destination server. This helps with reducing the network footprint.
In the below picture we have two operations performed by the broker: one that writes the data to the disk for durability and prepares results for replication and another that does bundling. Persistence and replication operation (execution service) in our example would be performed by two workstations (threads) to allow processing two tasks simultaneously. In contrast, the bundling operation has only one workstation so it can perform only one bundling and sending task at a time.
Once a PUBLISH packet arrives into the broker system, there are multiple tasks to be performed for its processing - it is not as simple as moving the message from point A to point B. The PUBLISH packet will have to be persisted (stored to disk for durability) and prepared for replication (for fault tolerance). After the packet is written to the disk and the replication is prepared by one of the workstations, the result proceeds to the next workstation performing the bundling and sending operation. The path of the task in the broker ‘production line’ is shown in the figure below.
Let us apply the principles used in optimization of production lines to the broker.
First of all, it is not necessary to lump writing to the disk with preparing the message for replication. These could be two separate operations. However, these two operations must be performed in strict order to ensure that when the replication is finished, the PUBLISH message is present at the origin node as well as on replicas. Even with this strict ordering requirement, separation of these operations results in finer tasks.
The below diagram depicts the workflow modified by decoupling the first operation in the flow. Now, the persist tasks are handled by workstations performing the persistence operation. As a result, a new kind of task, namely, the replication preparation task, is produced and handled by yet another workstation performing the replication preparation operation. Since the work within the persist task got reduced by descoping the replication preparation work from it, the workstations performing the persistence operation can finish their work faster and thus will be able to pick new tasks at a higher pace.
But would this accelerated handling of persist tasks mean that the whole flow for every given task will be shorter and thus the total message throughput will be higher? Not necessarily.
The replication preparation workstation has only one station which can process only one task at a time. Assuming that persistence operations and replication preparation take a similar amount of time, increasing the number of persistence workstations (threads) won’t make end-to-end processing faster. This will result in a build up of a queue of tasks before the single replication workstation.
Can we then apply the other optimization of adding more workstations to help tasks pass through the replication preparation operation faster? Yes, but only to some extent.
Reducing Work-in-Progress by Adding More Workstations to the Operation
As we have briefly discussed before, ordering matters in information processing systems. Therefore, we have to ensure that the order in which the messages are propagated is not violated. But to which extent do we have to ensure that? One may argue that to the extent of client sessions since they are independent of each other. However, if we go this route, we risk blowing the amount of workstations for the replication operation since it is quite common to have thousands to millions of connections depending on the use case. This will result in a significant overhead by (re)allocating the limited resource (CPU time) from one workstation (thread) to another. So, this granularity is not an option.
We could go a bit higher up the abstraction ladder and use a server as a granularity level. Each server is an independent destination for the message in the MQTT broker after all, hence it is acceptable to have a workstation that represents one server given that the amount of servers is not huge. Practically, the number of servers in the customer use cases rarely reaches even 10, hence this is a valid assumption.
But what about ordering? Since we need to ensure it at the level of clients, having one thread per server (workstation matching a server) gives us an even better guarantee: all the messages for the same server will be sent in order.
Let’s see how our modified task flow will look like with the below figure. For clarity, the newly added workstation (thread) is highlighted with blue color.
Since in our example messages are directed to different servers (one to A, another to B), they can take advantage of both workstations performing the replication preparation operation in the modified workflow. Given that the work in persistence workstations might, in practice, be way higher than that in the replication preparation workstations (due to time-consuming disk writes), we could also increase the number of workstations performing persistence operation, e.g. to 4. Likely, with this we would be able to avoid the reduction in the end-to-end processing speed since faster replication preparation workstations will not starve of tasks arriving at the speed that they can handle.
Implementing Workflow Optimizations in the HiveMQ Broker
The improvements introduced in HiveMQ 4.24.0 and 4.9.15 (LTS) to gain a substantial boost to broker efficiency take their roots in the optimizations discussed above.
Specifically, HiveMQ broker has a new task executor to perform the preparatory tasks for intra-cluster traffic sending. Although the new executor has multiple threads, their count is capped by what the bundling executor is capable of processing to reduce the contention in the underlying message queues.
On one hand, this design allows the broker to offload some of the work from the busy persistence executors to a new executor which, in turn, enables the persistence executors to handle more incoming load if the processing capacity of the hardware is sufficient. On the other hand, limiting the access to the shared queues downstream allows the broker to proceed with sending bundles faster.
To maximize throughput of the intra-cluster traffic, HiveMQ broker places an emphasis on bundling the messages prior to sending. Larger bundles of messages have smaller network footprint because they share service headers of the underlying transport protocols. A flip side of bundling is that larger bundles take more time to accumulate which has a profound negative effect on the end-to-end latency that is crucial for quite a few consumer-facing applications.
Navigating the tradeoff between maximizing the throughput and minimizing the latency is a prime example of an engineering challenge. Due to diversity and sheer number of applications relying on the HiveMQ platform to maintain reliable high-performance data distribution and movement, the bundling behavior has to adapt to the load as accurately and swiftly as possible. HiveMQ broker achieves this naturally without any additional parameters by limiting the processing capacity of the bundling executor. Our internal benchmarks prove that this increases the platform throughput while minimizing the network footprint of the intra-cluster traffic. Although natural and adaptable, we have discovered that this design leads to a very peculiar edge case.
The engineers discovered that limiting the processing capacity of the bundler involved in transmitting the messages over the network could backfire in a very rare case of a lasting network disruption. To maintain reliability even in this case while still offering high throughput, the engineering team introduced dynamic threading into the bundling and sending functions of the HiveMQ broker.
Once detected, the stalling traffic will signal to the broker that a new thread needs to be spawned to continue sending messages. Scaling threads up allowed the HiveMQ broker to support the bundling behavior that minimizes the network footprint even in the case of network disruption. That way, the HiveMQ platform managed to keep the best of both worlds - reliability of data delivery even in the edge case of cloud provider networks disruptions and high throughput with optimized network footprint in high load scenarios.
Realizing the Potential of Powerful AWS Instances
The introduction of a new executor allows the HiveMQ broker to scale vertically[4] more efficiently by utilizing available CPU cores better.
Taking advantage of AWS c7a EC2 instances powered by 4th generation AMD EPYC processors, a HiveMQ broker deployment composed of 4 c7a.8xlarge instances with 32 vCPUs each can now maintain around 600.000 QoS1 messages per second of total message traffic (inbound and outbound) while utilizing around 90% CPU on average with a few thousand clients connected to the broker, each sending and receiving messages as fast as possible.
When compared to the previous version of the broker (performance graphs shown below), we witnessed close to 50% improvement in the total throughput of the broker which was at 400,000 messages per second before.
Optimization of the task processing pipelines in the broker is one of the key contributors to continuous improvement of the HiveMQ broker to scale ahead of our customers’ applications.
Prioritizing Efficiency
The improvements highlighted in this blog can be viewed from two standpoints.
First of all, one can get more throughput from the existing high-end hardware. This performance improvement might be useful for the companies with actively growing businesses.
On the other hand, the same level of throughput can be achieved with fewer high-end nodes. So, say, if the load of your MQTT broker stays at 200,000 incoming publish messages per second, then you would need only 3 c7a.8xlarge instances compared to 4 required prior to the improvements in HiveMQ 4.24.0 and 4.9.15 (LTS). Having one node less even at this scale is equivalent to saving 14 386 USD per year for AWS on-demand instances and, for some deployments, an additional sum on cross-availability zones traffic. Thus, we can claim that the latest optimizations boost HiveMQ broker’s efficiency by pushing down the cost per message passed through the broker.
Continuous improvement of the broker is vital for the lasting success of HiveMQ. The engineering team maintains relentless focus on improving the subsystems, such as resource management, persistence, queueing, and replication, along the main path of data passing through the broker. The forthcoming improvements are expected to raise the bar of what MQTT brokers are capable of achieving in terms of throughput even higher. Stay tuned.
References
[1] Similar analogy holds for coroutines.
[2] We omit durable storing to the disk since we assume that it is performed as part of the task.
[3] This effect is introduced by context switching when upon switching from executing one thread to another, some state needs to be stored on the stack and retrieved from it.
[4] Vertical scalability is the capability of the software to effectively utilize the resources on the given server to perform its tasks.
Dr. Vladimir Podolskiy
Dr. Vladimir Podolskiy is a Senior Distributed Systems Specialist at HiveMQ. He is a software professional who possesses practical experience in designing dependable distributed software systems requiring high availability and applying machine intelligence to make software autonomous. He enjoys writing about MQTT, automation, data analytics, distributed systems, and software engineering.