Skip to content

Announcing HiveMQ Pulse, the Distributed Data Intelligence Platform. Join the Private Preview

How to Stream Machine Data into TimescaleDB Using MQTT

by Kudzai Manditereza
8 min read

In modern industrial settings, real-time insights into machine performance are critical. You often need to capture production metrics such as downtimes, production cycles, and defects, then store them for analytics, alerting, and historical reporting. This tutorial provides a step-by-step demonstration of how to set up a pipeline that streams machine data through MQTT, via a HiveMQ broker, to TimescaleDB—a time-series database built on PostgreSQL.

Architecture Overview

Let’s examine a high-level view of how data flows through our pipeline. First, a .NET Core application simulates various machine events—such as downtime, production cycles, and defects—and publishes them as MQTT messages to specific topics on a local HiveMQ Broker.

Architecture of how to stream machine data into timescaleDB using MQTTNext, the HiveMQ Broker—enhanced with the HiveMQ PostgreSQL extension—listens for these MQTT messages on the relevant topics. As soon as a new message arrives, the extension automatically executes SQL statements to insert the data into TimescaleDB.

Finally, TimescaleDB stores all incoming data in dedicated tables, providing an efficient and scalable repository for time-series machine data.

Setting Up TimescaleDB

First, create a new database on your TimescaleDB server named plantdb. Within this database, you’ll set up three tables—machine_downtimes, production_cycles, and production_defects—with columns tailored to capture the relevant machine data.

machine_downtimes tablemachine_downtimes table

production_cycles tableproduction_cycles table

production_defects tableproduction_defects table

At this point, these tables are empty, ready to receive incoming records from your data pipeline.

Setting Up the MQTT Broker

To set up the MQTT broker, download the HiveMQ Enterprise MQTT Broker, which includes a PostgreSQL extension available on a five-hour trial before requiring a reset. Place the downloaded broker in a local directory of your choice, then start it. 

By default, HiveMQ listens on port 1883; you can verify that the broker is running by visiting its admin interface at port 8080 in your browser.

HiveMQ Control CenterWhen you navigate to the Installed Integrations section, you’ll see that there are currently no active integrations.

Publishing Machine Data to the MQTT Broker

With the PostgreSQL extension soon to be configured, let’s first run the machine data simulator. To observe the MQTT messages generated by the simulator, open MQTT Explorer and connect it to the HiveMQ MQTT Broker at localhost on port 1883

You’ll see that the simulator publishes:

Downtime events under the DowntimeData topic

Downtime events under the DowntimeData topicProduction cycles under the CycleData topic

Production cycles under the CycleData topicProduction defects under the DefectData topic

Production defects under the DefectData topicAll of this data is in JSON format. Our goal is to persist these messages in our TimescaleDB database. Thus, the next step is to configure and enable the PostgreSQL extension to handle these incoming data streams.

Streaming MQTT Data to TimescaleDB

To set up the PostgreSQL extension for data streaming, navigate to your HiveMQ Broker directory and open the extensions folder. Inside, locate the hivemq-postgresql-extension folder, then open the conf directory. Here, you’ll place your config.xml file (you can copy the sample configuration from the examples folder).

Next, open config.xml in your preferred editor (e.g., Visual Studio Code). 

Streaming MQTT Data to TimescaleDBBegin by adding the PostgreSQL connection details—hostname, port, database name, and user credentials. Once the connection information is set, create your MQTT-to-Postgres “routes.” 

In this example, there are three routes, corresponding to the DowntimeData, CycleData, and DefectData topics.

Streaming MQTT Data to TimescaleDB

  • DowntimeData Route

    • MQTT Topic Filter: Points to the topic publishing downtime data.

    • Processor: References the insert_downtime_data SQL file containing the insertion query for TimescaleDB.

  • CycleData Route

    • MQTT Topic Filter: Points to the topic publishing cycle data.

    • Processor: References the insert_cycle_data SQL file.

  • DefectData Route

    • MQTT Topic Filter: Points to the topic publishing defect data.

    • Processor: References the insert_defect_data SQL file.

Next, let’s look at the queries referenced in our PostgreSQL extension config file. Within these SQL files, you’ll find standard SQL INSERT statements that map JSON properties from the MQTT messages to the appropriate columns in TimescaleDB, with any necessary type casting.

When a new message arrives on one of the specified topics, the matching query executes automatically, inserting the data into TimescaleDB. 

INSERT INTO machine_downtimes (timestamp,machine_id, machine_name, downtime_start, downtime_end, downtime_type, reason_code, description, shift_id, operator_id, site, area, line)
SELECT
   (json_data->>'timestamp'):: timestamptz AS timestamp,
   (json_data->>'machine_id'):: text AS machine_id,
   (json_data->>'machine_name'):: text AS machine_name,
   (json_data->>'downtime_start'):: timestamp AS downtime_start, 
   (json_data->>'downtime_end'):: timestamp AS downtime_end,
   (json_data->>'downtime_type'):: text  AS downtime_type,
   (json_data->>'reason_code'):: text AS reason_code,
   (json_data->>'description'):: text  AS description,
   (json_data->>'shift_id'):: text  AS shift_id,
   (json_data->>'operator_id'):: text  AS operator_id,
   (json_data->>'site'):: text AS site,
   (json_data->>'area'):: text AS area, 
   (json_data->>'line'):: text  AS line 
FROM (
   VALUES
       (${mqtt-payload-utf8}::jsonb)
) AS input(json_data);

insert_downtime_data.sql

INSERT INTO production_cycles (timestamp, machine_id, machine_name, part_id, start_time, end_time, ideal_cycle_time_seconds, units, shift_id, operator_id, temperature_c, energy_consumption_kwh, site, area, line)
SELECT
   (json_data->>'timestamp'):: timestamptz AS timestamp,
   (json_data->>'machine_id'):: text AS machine_id,
   (json_data->>'machine_name'):: text AS machine_name,
   (json_data->>'part_id'):: text AS part_id,
   (json_data->>'start_time'):: text AS start_time, 
   (json_data->>'end_time'):: text AS end_time,
   (json_data->>'ideal_cycle_time_seconds'):: int AS ideal_cycle_time_seconds,
   (json_data->>'units'):: int AS units,
   (json_data->>'shift_id'):: text  AS shift_id,
   (json_data->>'operator_id'):: text  AS operator_id,
   (json_data->>'temperature_c'):: decimal AS temperature_c,
   (json_data->>'energy_consumption_kwh'):: decimal AS energy_consumption_kwh,
   (json_data->>'site'):: text AS site,
   (json_data->>'area'):: text AS area, 
   (json_data->>'line'):: text  AS line 
FROM (
   VALUES
       (${mqtt-payload-utf8}::jsonb)
) AS input(json_data);

insert_cycle_data.sql

INSERT INTO production_defects (timestamp, machine_id, machine_name, defect_test_time, category, defect_type, quantity, description, detection_method, severity, status, resolved_by, resolution_action, resolution_time, shift_id, operator_id, site, area, line)
SELECT
   (json_data->>'timestamp'):: timestamptz AS timestamp,
   (json_data->>'machine_id'):: text AS machine_id,
   (json_data->>'machine_name'):: text AS machine_name,
   (json_data->>'defect_test_time'):: text AS defect_test_time,
   (json_data->>'category'):: text AS category, 
   (json_data->>'defect_type'):: text AS defect_type,
   (json_data->>'quantity'):: int AS quantity,
   (json_data->>'description'):: text AS description,
   (json_data->>'detection_method'):: text AS detection_method,
   (json_data->>'severity'):: text AS severity,
   (json_data->>'status'):: text AS status,
   (json_data->>'resolved_by'):: text AS resolved_by,
   (json_data->>'resolution_action'):: text AS resolution_action,
   (json_data->>'resolution_time'):: text AS resolution_time,
   (json_data->>'shift_id'):: text  AS shift_id,
   (json_data->>'operator_id'):: text  AS operator_id,
   (json_data->>'site'):: text AS site,
   (json_data->>'area'):: text AS area, 
   (json_data->>'line'):: text  AS line 
FROM (
   VALUES
       (${mqtt-payload-utf8}::jsonb)
) AS input(json_data);

insert_defect_data.sql

To activate this extension, simply remove the DISABLED file in the extension directory. Once done, head over to the broker’s admin interface—you should see the PostgreSQL extension listed as active, and your data will start streaming into TimescaleDB.

PostgreSQL extension

Verifying the Data in TimescaleDB

Finally, return to TimescaleDB and run a query on your tables to confirm that data is flowing in. You should see new records in the machine_downtimes, production_cycles, and production_defects tables. 

Verifying the Data in TimescaleDBFrom here, you can connect your favorite data visualization or analytics tool to TimescaleDB to explore and gain insights from this streaming machine data.

Watch the Video

Chapters

    Kudzai Manditereza

    Kudzai is a tech influencer and electronic engineer based in Germany. As a Sr. Industry Solutions Advocate at HiveMQ, he helps developers and architects adopt MQTT and HiveMQ for their IIoT projects. Kudzai runs a popular YouTube channel focused on IIoT and Smart Manufacturing technologies and he has been recognized as one of the Top 100 global influencers talking about Industry 4.0 online.

    • Kudzai Manditereza on LinkedIn
    • Contact Kudzai Manditereza via e-mail
    HiveMQ logo
    Review HiveMQ on G2