The HiveMQ MQTT Client Library for Java and Its reactive API Flavor
In this third and final part of the series on the API flavors of the HiveMQ MQTT Client, we will look at the reactive API flavor. In reactive programming, we think about events and how to react to these events. For example, an event can be an incoming PUBLISH
message, and we define how to handle it.
As opposed to the blocking API, which provides a synchronous programming model, the reactive and async API allow for async programming, which enables us to implement a more efficient threading model.
What sets the reactive API apart from the async API is the ability to harness all the advantages of your reactive library of choice. For example, RxJava offers advanced backpressure handling with the Flowable
interface. The backpressure support allows you to manage situations where one component produces more events in a set timeframe than another component can process.
How to get started with the Reactive API of the HiveMQ MQTT Client in Java
To start, you will need the HiveMQ MQTT Client library.
Assuming your project uses Gradle add this to your build.gradle.kts
to obtain the dependency:
Examples
In the following code examples we use RxJava but the HiveMQ MQTT Client also offers a Reactor based API. If you are unfamiliar with reactive programming, we advise you to familiarize yourself with the core concepts of your preferred reactive framework and the respective datatypes.
One key concept we would like to highlight is laziness. Laziness refers to the behavior where the Observable does not emit any items until a reactive subscriber subscribes to it. This means the Observable does not start producing values until a reactive subscriber needs it. We use this concept to independently declare how we react to each event which removes the need for overly nested callbacks.
Reactive Publisher
To implement an MQTT publisher with RxJava do the following:
Use the static method
Mqtt5Client.builder()
to obtain a builder. Provide the client identifier, the MQTT broker host and the MQTT broker port. Finish the building process withbuildRx()
.Use
disconnect()
to define the disconnecting behavior. Attach a callback withdoOnComplete
to writeSuccessfully disconnected!
on the console once the client has successfully disconnected.Use
Flowable.range(0, 10)
to create a Flowable that can emit the numbers 0-9 once subscribed to.Use
map
to convert these numbers intoPUBLISH
messages, we use the static methodMqtt5Publish.builder()
to build thePUBLISH
message. Provide the topic, payload, QoS and complete the instantiation withbuild()
Define how to behave when all
PUBLISHES
are successfully processed. The program logsSuccessfully published!
and subscribe to the disconnect-Completable. The behavior only starts when subscribing bBecause of laziness.doOnError
allows us to define the behavior when the sending of aPUBLISH
message is unsuccessful.
Last, we define the behavior of connecting to the MQTT broker.
After a successful connection attempt, the application logs
Successfully connected!
.Hand the publish-Flowable to the client library and subscribe to the result to trigger the publishing.
doOnError
allows us to define the behavior when connecting fails.Instantly subscribe to start connecting.
Reactive Subscriber
To implement an MQTT subscriber with RxJava do the following:
Use the static method
Mqtt5Client.builder()
to obtain a builder. We provide the client identifier, the MQTT broker host and port. Finish the building process withbuildRx()
.Before connecting, use
publishes()
to obtain a Flowable that contains incoming publishes. Subscribe with a callback; the client invokes that callback everytime it receives an incomingPUBLISH
message.Use
subscribeWith()
to define the behavior of sending theSUBSCRIBE
package to the MQTT broker.With
doOnSuccess()
ensure thatSuccessfully subscribed!
is written to the console after the client library receives theSUBACK
.doOnError
allows us to define the behavior when MQTT subscribing fails.
Last, define the behavior of connecting to the MQTT broker.
After a successful connection attempt, the application logs
Successfully connected!
.Subscribe to the
subAckSingle
to trigger the MQTT subscribe behavior. The behavior only starts when subscribing bBecause of laziness.doOnError
allows us to define the behavior when connecting fails.Instantly subscribe to start connecting.
Switching API Flavor
The HiveMQ MQTT Client allows you to simply switch to the blocking or async API flavor using toBlocking()
or toAsync()
. This enables you to choose the API style that is best fitted for each part of your application. This example shows you how to switch from the reactive API flavor to the async API flavor which is explained in the second post of this series. It also illustrates how to switch from the reactive API flavor to the blocking API flavor from the first post in the series.
Conclusion
This concludes the series about the three API flavors of the HiveMQ MQTT Client. Developers who are looking to build MQTT applications that are both performant and scalable should consider utilizing the reactive flavor of the HiveMQ MQTT Client API. Although it may be less intuitive than the blocking API, the reactive API offers a more efficient threading model by enabling reactive programming.
To fully take advantage of the reactive API, developers need to be familiar with reactive libraries in Java, like RxJava or Reactor. However, even if they are not, the reactive API is still a great way to start with reactive programming in Java.
With reactive programming, developers can construct high-performing and scalable applications that can handle vast amounts of data.
HiveMQ Team
The HiveMQ team loves writing about MQTT, Sparkplug, Industrial IoT, protocols, how to deploy our platform, and more. We focus on industries ranging from energy, to transportation and logistics, to automotive manufacturing. Our experts are here to help, contact us with any questions.