Sending Filtered MQTT Sparkplug Messages to MongoDB for IIoT Data Transformation: Part 2
In Part 1, Sending Filtered MQTT Sparkplug Messages to MongoDB for IIoT Data Transformation, we shared details about how to convert an MQTT Sparkplug message into JSON format using HiveMQ’s Data Hub, write custom logic to process the data for your use case, and fan out metrics to the original topic prefix with the metric name appended. We’ll now continue by showing you a method for passing arguments from the data policy to the transformation script, allowing us to filter out unnecessary Sparkplug metrics. Finally, we’ll use the HiveMQ MongoDB Extension to send the data to a MongoDB database.
Update the Data Policy
In this section, we’ll create an argument in the data policy that is passed through to the transformation script. You can either do this via REST API or HiveMQ Control Center. For this example, we’ll use the HiveMQ Control Center.
As we mentioned earlier, the script requires an argument called allowList
to be defined in Task 2 of the data policy where the transformation script is invoked. Now, navigate to Data Hub-> scripts
and select Data Policies
. From here, click on “Add Argument
” to define a new argument. Set the key to allowList
, the value to machine
, performance
, and the Type
to String
. In other words, by configuring specific arguments for the data policy, you can customize the script’s behavior at runtime.
Figure 1: Configure the argument allowList
in the Control Center
The final result of the data policy is shown below:
Updated Data Policy
Now that we have configured an allowList of machine
, performance
, only metrics that have machine
or performance
as the prefix will be fanned out. All other remaining metrics will be filtered out and not published at all.
Integration with MongoDB
In this section, we’ll cover the final piece of the architecture that integrates the OT and IT environments. The data policy we created earlier deserializes the incoming Sparkplug (Protobuf) messages, filters the metrics according to an allow list, and serializes the result to JSON format. To stream data from the Broker into a MongoDB database, we’ll use the HiveMQ Enterprise Extension for MongoDB. Once the data is stored in MongoDB, you can query the data directly or build applications to surface real-time insights into your data. Remember that we are doing this because we can’t write Sparkplug (Protobuf) data directly into MongoDB — we need to transform the incoming IIoT data to JSON.
For our example, we are using MongoDB Atlas, the fully-managed cloud database service for easy setup and configuration of the Extension.
In Script 1 below, we show the XML configuration to configure the MongoDB Extension. To handle all Sparkplug messages, we use the following topic filter <mqtt-topic-filter>spBv1.0/#</mqtt-topic-filter>
.
Script 1: Configuration for the HiveMQ Extension for MongoDB
The MQTT messages are mapped through an Enterprise Extension processor, which defines how MQTT messages are converted into a MongoDB document.
Script 2 below shows the document-template.json
:
Script 2: MongoDB document-template.json
configuration
Now that everything is set up, we can test our architecture. In a test, we use a Sparkplug device to send data to the Broker. Data Hub handles the data, converts it from Protobuf into JSON, filters and fans out the metrics, and finally streams the data to MongoDB. We use mongosh to execute a MongoDB query:
Script 3: MongoDB test using mongosh
Now that the data is stored as a document in MongoDB, we can use MongoDB aggregation pipelines to derive further insights. As you can see in the example query result, the machine data for voltage
is shown with its values. As a next step, you could further optimize the document structure by removing empty or unused fields such as boolean_value, bytes_value, and int_value.
Conclusion
Over two blog posts, we showed how to use the HiveMQ Broker, HiveMQ Data Hub, and the HiveMQ Enterprise Extension for MongoDB to convert Sparkplug data from an IIoT device into a JSON document stored in MongoDB. We built a Data Hub pipeline that ingests Sparkplug Protobuf messages, filters metrics according to a configurable allowList, and publishes specific metrics as JSON format to individual topics (also known as fan out). Finally, we streamed the Sparkplug data into a MongoDB database which we can use to query specific insights, visualize aggregated metrics, or build real-time analytics applications that access the data. We hope you found this blog useful, and we would love to hear how it’s helped you with your use case.
If you have an interesting use case that you’d like us to write a blog about, please get in touch with us. To learn more, refer to our HiveMQ Data Hub documentation. If you’d like to try out this example in your environment, you can find all of the configurations in our GitHub Cookbook.
Stefan Frehse
Stefan Frehse is Senior Engineering Manager at HiveMQ. He earned a Ph.D. in Computer Science from the University of Bremen and has worked in software engineering and in c-level management positions for 10 years. He has written many academic papers and spoken on topics including formal verification of fault tolerant systems, debugging and synthesis of reversible logic.
Eric Henderson
Eric Henderson is a Senior Product Manager at HiveMQ, with extensive experience in both start-ups and big tech. Specializing in IoT, AI-driven software, speech recognition, and InsureTech, Eric excels in guiding products from inception to maturity, collaborating across teams to innovate and deliver solutions.