HiveMQ & Google Cloud - Enabling Bi-directional MQTT Data Movement
by
Florian Limpöck16 min read
We’re excited to share that we have just released the HiveMQ Enterprise Extension for Google Cloud Pub/Sub.
This new extension enables the bi-directional movement of MQTT data between HiveMQ and the Google Cloud. There are two methods to accomplish this:
Through Mappings, and
Through Transformers.
Each method has its advantages and should be selected according to the use case requirements.
Mappings are helpful for more straightforward use cases. For instance, while setting up the configuration, the intended content and destination of the message is already known.
On the other hand, Transformers can dynamically decide the content and destination of a message at runtime.
In this article, you will learn how to transform incoming MQTT PUBLISH messages and forward them to Google Cloud Pub/Sub. From Google Cloud Pub/Sub, you can use backend data storage and analysis services to further process messages.
You will also learn how to read messages from Google Cloud Pub/Sub and forward them to subscribed MQTT clients.
The following example walks you through a hypothetical round-trip scenario involving Transformers.
The first Transformer takes the Quality of Service, Message Expiry, and the Retained Flag properties of an MQTT PUBLISH and converts them to Pub/Sub message attributes.
The second Transformer sets the attributes of the Pub/Sub message back to an MQTT PUBLISH with Quality of Service, Message Expiry, and the Retained Flag MQTT properties.
Prerequisites
Before you can start implementing these transformers, check these prerequisites:
Install or use Java 11 or higher
Java IDE that supports Gradle (for this tutorial, we use IntelliJ)
To start, open your IDE and create a new project from version control.
Paste the Hello World Customization GitHub URL. git@github.com:hivemq/hivemq-google-cloud-pubsub-hello-world-customization.git into the URL field.
Remove the Hello World code from the java packages of the src/main and src/test directories.
(Optional) Change the group and description in build.gradle.kts.
(Optional) Adapt the manifest attributes in build.gradle.kts. (At least the vendor makes sense in most cases.)
(Optional) Set the rootProject.name in settings.gradle.kts.
Now we are all set to implement our first transformer.
MQTT to Pub/Sub Transformer
Go to the src/main/java directory and create an ExampleMqttToPubSubTransformer class that implements the MqttToPubSubTransformer interface in the example.pubsub.customizations package.
Remember the package and the class name, we need them later in the configuration part of the extension.
Override init()
The first questions we must ask ourselves are: “Do we need some information from the configuration file, and do we want to register some metrics?”
If the answer is yes to either of the above questions, then we need to override the init method of the transformer interface. In this example, we want a metric for every failed transformation, which will then be registered in the init.
To ensure that we can use the transformer even if the metric creation fails, we surround the creation of the metric with a try-catch block.
Override transformMqttToPubSub()
To transform the message, we override the transformMqttToPubSub method,
Retrieve the PublishPacket from the MqttToPubSubInput, and
Call newOutboundPubSubMessageBuilder() on the MqttToPubSubOutput object.
Next, we check whether there is a message payload. If the payload doesn’t exceed the maximum length that Google Cloud Pub/Sub allows, we set it to the data field of the OutboundPubSubMessageBuilder.
If the bytes in the payload exceed the maximum length, we log an error, increase the failed metric, and do not send the message by providing an empty list to the MqttToPubSubOutput.
If the payload size is fine, we add the Quality of Service (QoS), the Message Expiry Interval, and the Retained Flag to the OutboundPubSubMessageBuilder as attributes.
Finally, we set the desired destination topic and provide the built message to the MqttToPubSubOutput.
Example Implementation
A complete implementation of the ExampleMqttToPubSubTransformer looks like this:
Now that we have a working transformer for messaging from HiveMQ to Google Cloud Pub/Sub, the next step is building a transformer for the opposite direction.
Pub/Sub to MQTT Transformer
Go to the src/main/java directory and create an ExamplePubSubToMqttTransformer class implementing the PubSubToMqttTransformer interface in the example.pubsub.customizations package.
Remember the package and the class name, we need them later in the configuration part of the extension.
Override init()
Again, we override the init() method to register a metric for every failed transformation in the same way we did in the MqttToPubSubTransformer.
Override transformMqttToPubSub()
To transform the message, we override the transformPubSubToMqtt method,
Retrieve the InboundPubSubMessage from the PubSubToMqttInput, and
Call newPublishBuilder() on the PubSubToMqttOutput object.
Next, we convert Quality of Service (QoS), Message Expiry Interval, and Retained Flag from the Pub/Sub message attributes to the respective MQTT properties and set them to the PublishBuilder.
Then, we convert the Pub/Sub message data to an MQTT payload and set it to the PublishBuilder.
Finally, we set the desired destination topic and provide the built message to the PubSubToMqttOutput.
Example Implementation
A complete implementation of the ExamplePubSubToMqttTransformer looks like this:
Once we have a working transformer for messaging from Google Cloud Pub/Sub to HiveMQ, we are ready to install our transformers.
Installation
Run the ./gradlew jar Gradle task to build the custom transformers.
Move the build/libs/example-pubsub-customization-4.9.0.jar file to the HIVEMQ_HOME/extensions/hivemq-google-cloud-pubsub-extension/customizations directory.
Adapt the google-cloud-pubsub-configuration.xml configuration.
Remove the mapping configurations.
Set the <google-cloud-project-id> in the <pubsub-connection> tag to match your Google Cloud Pub/Sub project ID.
Set the <file-path> in the <service-account> tag to the absolute or relative path of your service-account-key.json file.
Create an <mqtt-to-pubsub-transformer> and a <pubsub-to-mqtt-transformer> like shown in the example below.
Set the <pubsub-subscription> name in the <pubsub-to-mqtt-transformer> tag to match the correct Google Cloud Pub/Sub subscription.
Testing
Now that we have a running HiveMQ with the Google Cloud Pub/Sub extension that uses two transformers for messaging between HiveMQ and Google Cloud, it’s time to do some tests.
The open-source HiveMQ MQTT CLI tool included in the tools directory of our HiveMQ installation makes it easy to test our deployment. We can use the MQTT CLI to send an MQTT PUBLISH message with the topic “to/pubsub” and subscribe an MQTT client to a “from/pubsub” topic. The PUBLISH message goes from HiveMQ to the Google Cloud Pub/Sub extension. In the extension, the message is processed by the transformer (m2p-t-1), converted to a Pub/Sub message, enriched with Pub/Sub attributes, and sent to the configured Pub/Sub topic (my-pubsub-topic). Google Cloud Pub/Sub then delivers the message to the subscription (my-pubsub-subscription). Because we subscribe to this Pub/Sub subscription with the other transformer, the extension pulls the message from Google Cloud Pub/Sub. The message is processed by the transformer (p2m-t-1), and an MQTT PUBLISH message is created with the given MQTT properties and sent to the topic “from/pubsub”.
Follow these steps to achieve and verify this message flow:
1. Open a bash console such as Terminal, connect an MQTT client to HiveMQ, and subscribe to the topic from/pubsub.
2. Open another bash console and send an MQTT PUBLISH message with Quality of Service level 1, a retained flag, and a message expiry interval of 1000 to the ‘to/pubsub” topic:
3. Send another MQTT PUBLISH message with Quality of Service level 0, a retained flag, and a message expiry interval of 1000 to the ‘to/pubsub” topic:
4. Check the output of the subscribed MQTT client. This client should have received two publish messages, one with QoS 0 and one with QoS 1; both the retained message flag set to false and a message expiry setting of 1000. Retain is false because if you don’t set the retain-as-published option in the subscribe, the message loses that information in the standard publish flow.
5. Disconnect your subscriber and reconnect and subscribe again. This should print the following output with retain set to true😀
6. If you subscribe to “to/pubsub,” you should get the same output.
7. You can also check your retained messages in the HiveMQ Control Center. There should be one for the topic “to/pubsub” and the same for the topic “from/pubsub”.
Florian Limpöck has been a software engineer at HiveMQ since 2013. He is interested in distributed systems, the MQTT protocol, random numbers, and his dog, Karl, with whom he shares an apartment in Landshut, Germany, the birthplace of HiveMQ.
By clicking the subscribe button you give your consent to the use of your data according to our
Privacy Policy. You can withdraw your consent at any time with future effect.