Skip to content

Sending Filtered MQTT Sparkplug Messages to MongoDB for IIoT Data Transformation: Part 2

by Stefan Frehse, Eric Henderson
9 min read

In Part 1, Sending Filtered MQTT Sparkplug Messages to MongoDB for IIoT Data Transformation, we shared details about how to convert an MQTT Sparkplug message into JSON format using HiveMQ’s Data Hub, write custom logic to process the data for your use case, and fan out metrics to the original topic prefix with the metric name appended. We’ll now continue by showing you a method for passing arguments from the data policy to the transformation script, allowing us to filter out unnecessary Sparkplug metrics. Finally, we’ll use the HiveMQ MongoDB Extension to send the data to a MongoDB database. 

Update the Data Policy

In this section, we’ll create an argument in the data policy that is passed through to the transformation script. You can either do this via REST API or HiveMQ Control Center. For this example, we’ll use the HiveMQ Control Center.

As we mentioned earlier, the script requires an argument called allowList to be defined in Task 2 of the data policy where the transformation script is invoked. Now, navigate to Data Hub-> scripts and select Data Policies. From here, click on “Add Argument” to define a new argument. Set the key to allowList, the value to machine, performance, and the Type to String. In other words, by configuring specific arguments for the data policy, you can customize the script’s behavior at runtime. 

Configure the argument allowList in the Control CenterFigure 1: Configure the argument allowList in the Control Center

The final result of the data policy is shown below:

{
  "id" : "protobuf-to-json-and-fanout",
  "matching" : {
    "topicFilter" : "spBv1.0/+/DDATA/+/+"
  },
  "validation" : {
    "validators" : [ ]
  },
  "onSuccess" : {
    "pipeline" : [ {
      "id" : "operation-ErVR0",
      "functionId" : "Serdes.deserialize",
      "arguments" : {
        "schemaVersion" : "latest",
        "schemaId" : "sparkplug-format"
      }
    }, {
      "id" : "operation-QzwYJ",
      "functionId" : "fn:fanout:latest",
      "arguments" : {
        "allowList" : "machine,performace"
      },
      "onBranch" : [ {
        "branchId" : "metrics",
        "pipeline" : [ {
          "id" : "branch-serialize-a4BAC",
          "functionId" : "Serdes.serialize",
          "arguments" : {
            "schemaVersion" : "latest",
            "schemaId" : "simple-json"
          }
        } ]
      } ]
    }, {
      "id" : "operation-4XR5d",
      "functionId" : "Serdes.serialize",
      "arguments" : {
        "schemaVersion" : "latest",
        "schemaId" : "simple-json"
      }
    } ]
  },
  "onFailure" : {
    "pipeline" : [ ]
  }
}

Updated Data Policy 

Now that we have configured an allowList of machine, performance, only metrics that have machine or performance as the prefix will be fanned out. All other remaining metrics will be filtered out and not published at all. 

Integration with MongoDB

In this section, we’ll cover the final piece of the architecture that integrates the OT and IT environments. The data policy we created earlier deserializes the incoming Sparkplug (Protobuf) messages, filters the metrics according to an allow list, and serializes the result to JSON format. To stream data from the Broker into a MongoDB database, we’ll use the HiveMQ Enterprise Extension for MongoDB. Once the data is stored in MongoDB, you can query the data directly or build applications to surface real-time insights into your data. Remember that we are doing this because we can’t write Sparkplug (Protobuf) data directly into MongoDB we need to transform the incoming IIoT data to JSON. 

For our example, we are using MongoDB Atlas, the fully-managed cloud database service for easy setup and configuration of the Extension. 

In Script 1 below, we show the XML configuration to configure the MongoDB Extension. To handle all Sparkplug messages, we use the following topic filter <mqtt-topic-filter>spBv1.0/#</mqtt-topic-filter>.

<hivemq-mongodb-extension xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                          xsi:noNamespaceSchemaLocation="config.xsd">
    <mongodbs>
        <mongodb>
            <id>my-mongodb-id</id>	    <connection-string>mongodb+srv://mongodb:<password>@atlascluster.<id>.mongodb.net</connection-string>
        </mongodb>
    </mongodbs>
    <mqtt-to-mongodb-routes>
        <mqtt-to-mongodb-route>
            <id>my-mqtt-to-mongodb-route</id>
            <mongodb-id>my-mongodb-id</mongodb-id>
            <mqtt-topic-filters>
                <mqtt-topic-filter>spBv1.0/#</mqtt-topic-filter>
            </mqtt-topic-filters>
            <collection>production-lines</collection>
            <database>iot-data</database>
            <processor>                <document-template>conf/document-template.json</document-template>
            </processor>
        </mqtt-to-mongodb-route>
    </mqtt-to-mongodb-routes>
</hivemq-mongodb-extension>

Script 1: Configuration for the HiveMQ Extension for MongoDB

The MQTT messages are mapped through an Enterprise Extension processor, which defines how MQTT messages are converted into a MongoDB document.

Script 2 below shows the document-template.json:

{
  "topic": "${mqtt-topic}",
  "payload_utf8": ${mqtt-payload-utf8},
  "qos": "${mqtt-qos}",
  "retain": ${mqtt-retain},
  "packet_id": ${mqtt-packet-id},
  "payload_format_indicator": "${mqtt-payload-format-indicator}",
  "response_topic": "${mqtt-response-topic}",
  "correlation_data_utf8": "${mqtt-correlation-data-utf8}",
  "user_properties": ${mqtt-user-properties-json},
  "my_user_property": "${mqtt-user-properties$myUserPropertyName}",
  "arrival_timestamp": ${timestamp-ms}
}

Script 2: MongoDB document-template.json configuration

Now that everything is set up, we can test our architecture. In a test, we use a Sparkplug device to send data to the Broker. Data Hub handles the data, converts it from Protobuf into JSON, filters and fans out the metrics, and finally streams the data to MongoDB. We use mongosh to execute a MongoDB query:

Atlas atlas-<id>-shard-0 [primary] iot-data> db["production-lines"].findOne();
{
  _id: ObjectId('66bb372344a5bc7271e06d04'),
  topic: 'spBv1.0/FactoryA/DDATA/ProductionLine01/CuttingMachine/machine:voltage',
  payload_utf8: {
    metadata: {
      is_multi_part: 'false',
      seq: '0',
      md5: ''
    },
    string_value: '',
    is_null: 'false',
    boolean_value: 'false',
    dataset_value: { num_of_columns: '0' },
    is_transient: 'false',
    bytes_value: '',
    double_value: '34.25211479830568',
    int_value: '0',
    is_historical: 'false',
    datatype: '0',
    name: 'machine:voltage',
    extension_value: {},
    alias: '0',
    float_value: '0.0',
    long_value: '0',
    properties: {},
    template_value: { template_ref: '', is_definition: 'false', version: '' },
    timestamp: '1723545379534'
  },
  qos: 'AT_MOST_ONCE',
  retain: false,
  packet_id: 0,
  payload_format_indicator: '',
  response_topic: '',
  correlation_data_utf8: '',
  user_properties: [],
  my_user_property: '',
  arrival_timestamp: Long('1723545379564')
}

Script 3: MongoDB test using mongosh

Now that the data is stored as a document in MongoDB, we can use MongoDB aggregation pipelines to derive further insights. As you can see in the example query result, the machine data for voltage is shown with its values. As a next step, you could further optimize the document structure by removing empty or unused fields such as boolean_value, bytes_value, and int_value

Conclusion

Over two blog posts, we showed how to use the HiveMQ Broker, HiveMQ Data Hub, and the HiveMQ Enterprise Extension for MongoDB to convert Sparkplug data from an IIoT device into a JSON document stored in MongoDB. We built a Data Hub pipeline that ingests Sparkplug Protobuf messages, filters metrics according to a configurable allowList, and publishes specific metrics as JSON format to individual topics (also known as fan out). Finally, we streamed the Sparkplug data into a MongoDB database which we can use to query specific insights, visualize aggregated metrics, or build real-time analytics applications that access the data. We hope you found this blog useful, and we would love to hear how it’s helped you with your use case.

If you have an interesting use case that you’d like us to write a blog about, please get in touch with us. To learn more, refer to our HiveMQ Data Hub documentation. If you’d like to try out this example in your environment, you can find all of the configurations in our GitHub Cookbook.

Stefan Frehse

Stefan Frehse is Senior Engineering Manager at HiveMQ. He earned a Ph.D. in Computer Science from the University of Bremen and has worked in software engineering and in c-level management positions for 10 years. He has written many academic papers and spoken on topics including formal verification of fault tolerant systems, debugging and synthesis of reversible logic.

  • Contact Stefan Frehse via e-mail

Eric Henderson

Eric Henderson is a Senior Product Manager at HiveMQ, with extensive experience in both start-ups and big tech. Specializing in IoT, AI-driven software, speech recognition, and InsureTech, Eric excels in guiding products from inception to maturity, collaborating across teams to innovate and deliver solutions.

  • Eric Henderson on LinkedIn
  • Contact Eric Henderson via e-mail
HiveMQ logo
Review HiveMQ on G2