Modify Your MQTT Data In-Flight with Data Transformation
Typical IoT deployments have diverse data sets, either because of the variety of devices or because these devices are continuously getting more capabilities. For instance, IoT devices might transmit data in different formats (Protobuf Vs. JSON), units (Metric vs Imperial), or even date formats. Organizations need a solution to adjust or transform this data into the configuration that they need.
Data transformation is a method that involves modifying or converting data from, in this case, IoT devices, into a predefined configuration before it is sent or stored in a database — often to enhance its compatibility or usability for downstream processes. For instance, imagine if a U.S. pharmaceutical company imported equipment from Europe that was coded in Celsius instead of Fahrenheit. The company cannot modify the machine’s firmware because of FDA restrictions. In this situation, the company can use data transformation by writing custom scripts to transform the data — i.e. convert it from Celsius to Fahrenheit, ensuring seamless integration into their operations while complying with regulatory restrictions.
In this article, we will discuss stateless data transformations — a recently added feature in HiveMQ’s Data Hub. This feature was designed to streamline data transformation for our customers and improve their product development cycles. With this functionality, transformation scripts can be effortlessly uploaded to the broker, eliminating the need for interaction with any operations teams. This results in accelerated development processes. As a result, IT and OT engineering teams can iterate applications more swiftly, achieving better results in fewer cycles.
Scripting
The HiveMQ Platform Release 4.23 provides an Early Access (EA) version of Data Hub’s data transformation functionality. In this release, we introduce JavaScript to develop stateless transformation functions. Today, JavaScript is the most adopted language worldwide, which offers excellent properties to iterate applications while providing well-established engineering practices quickly. JavaScript is a dynamically typed language that provides a rich set of functions and serves countless functions.
Example: Unit Conversions
We want to practically introduce Data Hub’s Data Transformation capabilities by working along a simple example. In this example, a device is sending the following data in JSON format:
As you can see, the device sends a temperature value in the unit Fahrenheit. However, in our simple example, the application, such as a fan, expects the temperature in Celsius. However, the following formula converts Fahrenheit into Celsius:
C= 5/9 x (F - 32)
Where C is the temperature in Celsius, and F is Fahrenheit. We want to translate that function into JavaScript in the following:
The function convert
applies the formula introduced above to the provided argument. The converted value is rounded using the JavaScript function Math.floor
. The conversion in Celsius
is returned.
As a next step, the entry function for the data transformation is defined as follows. In the context of MQTT, the payloads are sent via the MQTT publish packet, which contains the relevant data — and, in our case, the data point from a temperature sensor. For each of these packets, a transformation function is called. Data Hub’s policy engine passes the MQTT packet to the transformation function with additional context information. The packet consists of the following fields:
Field | Type | Description |
---|---|---|
payload | JSON Object | Represents the deserialized MQTT payload |
QoS | Number | The original QoS (0,1,2) |
topic | String | The MQTT topic |
userProperties | Array of JSON Object { name, value } | The MQTT User Properties |
The JavaScript function transform
is shown below — it has two arguments, publish
and context
, as described above. The context object isn’t used in this function. For more information, please read HiveMQ Data Hub Transformations.
A newly created payload is a JSON object containing the fields celsius
and timestamp
. The introduced convert function is called with the field fahrenheit out of the incoming packet, and the result is assigned to the celsius
field.
The complete source code is stored in the file script.js
:
Data Policy
Data policies in HiveMQ Data Hub offer a declarative way to describe data validation and data transformation pipelines for MQTT payloads. Starting with the data validation step, MQTT payloads may be validated against a pre-defined schema. For a more detailed introduction, please read the article Getting Started with MQTT Data Validation Using HiveMQ Data Hub by Kudzai Manditereza.
In a data policy pipeline, additional functions can be defined to execute additional functions. There are basic built-in functions available, such as logging a message, adding user properties, dropping messages, or even redirecting payloads to a different topic. However, users can define their own customized scripts using JavaScript as introduced above.
Upload
There is one prerequisite to making use of transformation functions in data policies. The source file of the function needs to be uploaded to the HiveMQ Broker. Today, there are two options available:
REST API
the mqtt-cli
In the following, we will make use of the mqtt-cli.
The command above uploads the file script.js
with type transformation
and identifier fahrenheit-to-celsius
to the broker. Updating a function works with the same command. The broker does an automated versioning of functions. The following command gets the latest version:
and outputs:
The source field contains the source code encoded as base64.
Apply the function
Once the script has been uploaded to the HiveMQ Broker, it can be used in a data policy. The following example requires two schemas: 1) the schema that describes the data sent from the device from the factory sending Fahrenheit values and 2) the final schema after applying the transformation function containing Celsius values.
The schema created under id schema-from-sensor
above describes a JSONschemas containing two fields fahrenheit
and timestamp
typed as numbers. This schema will be used for incoming data from the device.
The schema created under id schema-for-fan
above is almost identical to the previous schema but with the celsius
field instead of fahrenheit
.
As next, we define the data policy to make use of the necessary steps defined above:
The data policy executes the following steps:
For all incoming data to the topic device/# the JSON schema validation with the schema
schema-from-sensor
is executed.onSuccess
: In the case the data is valid according to the schema, i.e., the data contains fahrenheit and timestamp as numbers in the JSON format, the onSuccess pipeline is executed. The pipeline consists of three functions:Serdes.deserialize
, which deserializes the data according to the schema and makes the MQTT payload understandable within the transformation functions.The actual transformation script referenced by
fn:fahrenheit-to-celsius:latest
is executed, and the result of that function is passed to the next stage of the pipeline. Thelatest
tag refers to the latest version of the function. Fixed versions can be referenced here too.Eventually, the transformed data needs to be serialized by using the function
Serdes.serialize
and the schema.
onFailure
: In case the incoming data is not valid according to the schema, theonFailure
case is executed which makes use of the Mqtt.drop function, which drops the MQTT payload and the messages do not reach any consumer.
Once the data policy is created, the following animation demonstrates the outcome. The incoming data is shown in the upper terminal publishing the Fahrenheit temperature. The console below shows the transformed data - converted into Celsius.
Conclusion
In summary, dealing with diverse data from IoT devices is a common challenge for organizations. The need to convert this data into a standardized format is crucial, especially with evolving device capabilities. This is where the latest Data Transformation feature helps (introduced in the HiveMQ Platform version 4.23).
This feature simplifies the data transformation process by allowing easy script (written in JavaScript), uploads to the broker, and simplifies operations since execution of the scripts is fully automated by HiveMQ Data Hub. This speeds up development cycles, enabling IT and OT engineering teams to quickly iterate applications and achieve better results.
Stefan Frehse
Stefan Frehse is Senior Engineering Manager at HiveMQ. He earned a Ph.D. in Computer Science from the University of Bremen and has worked in software engineering and in c-level management positions for 10 years. He has written many academic papers and spoken on topics including formal verification of fault tolerant systems, debugging and synthesis of reversible logic.