How to Stream Machine Data into TimescaleDB Using MQTT
In modern industrial settings, real-time insights into machine performance are critical. You often need to capture production metrics such as downtimes, production cycles, and defects, then store them for analytics, alerting, and historical reporting. This tutorial provides a step-by-step demonstration of how to set up a pipeline that streams machine data through MQTT, via a HiveMQ broker, to TimescaleDB—a time-series database built on PostgreSQL.
Architecture Overview
Let’s examine a high-level view of how data flows through our pipeline. First, a .NET Core application simulates various machine events—such as downtime, production cycles, and defects—and publishes them as MQTT messages to specific topics on a local HiveMQ Broker.
Next, the HiveMQ Broker—enhanced with the HiveMQ PostgreSQL extension—listens for these MQTT messages on the relevant topics. As soon as a new message arrives, the extension automatically executes SQL statements to insert the data into TimescaleDB.
Finally, TimescaleDB stores all incoming data in dedicated tables, providing an efficient and scalable repository for time-series machine data.
Setting Up TimescaleDB
First, create a new database on your TimescaleDB server named plantdb. Within this database, you’ll set up three tables—machine_downtimes, production_cycles, and production_defects—with columns tailored to capture the relevant machine data.
machine_downtimes table
production_cycles table
production_defects table
At this point, these tables are empty, ready to receive incoming records from your data pipeline.
Setting Up the MQTT Broker
To set up the MQTT broker, download the HiveMQ Enterprise MQTT Broker, which includes a PostgreSQL extension available on a five-hour trial before requiring a reset. Place the downloaded broker in a local directory of your choice, then start it.
By default, HiveMQ listens on port 1883; you can verify that the broker is running by visiting its admin interface at port 8080 in your browser.
When you navigate to the Installed Integrations section, you’ll see that there are currently no active integrations.
Publishing Machine Data to the MQTT Broker
With the PostgreSQL extension soon to be configured, let’s first run the machine data simulator. To observe the MQTT messages generated by the simulator, open MQTT Explorer and connect it to the HiveMQ MQTT Broker at localhost on port 1883.
You’ll see that the simulator publishes:
Downtime events under the DowntimeData topic
Production cycles under the CycleData topic
Production defects under the DefectData topic
All of this data is in JSON format. Our goal is to persist these messages in our TimescaleDB database. Thus, the next step is to configure and enable the PostgreSQL extension to handle these incoming data streams.
Streaming MQTT Data to TimescaleDB
To set up the PostgreSQL extension for data streaming, navigate to your HiveMQ Broker directory and open the extensions folder. Inside, locate the hivemq-postgresql-extension folder, then open the conf directory. Here, you’ll place your config.xml
file (you can copy the sample configuration from the examples folder).
Next, open config.xml
in your preferred editor (e.g., Visual Studio Code).
Begin by adding the PostgreSQL connection details—hostname, port, database name, and user credentials. Once the connection information is set, create your MQTT-to-Postgres “routes.”
In this example, there are three routes, corresponding to the DowntimeData, CycleData, and DefectData topics.
DowntimeData Route
MQTT Topic Filter: Points to the topic publishing downtime data.
Processor: References the
insert_downtime_data
SQL file containing the insertion query for TimescaleDB.
CycleData Route
MQTT Topic Filter: Points to the topic publishing cycle data.
Processor: References the
insert_cycle_data
SQL file.
DefectData Route
MQTT Topic Filter: Points to the topic publishing defect data.
Processor: References the
insert_defect_data
SQL file.
Next, let’s look at the queries referenced in our PostgreSQL extension config file. Within these SQL files, you’ll find standard SQL INSERT
statements that map JSON properties from the MQTT messages to the appropriate columns in TimescaleDB, with any necessary type casting.
When a new message arrives on one of the specified topics, the matching query executes automatically, inserting the data into TimescaleDB.
INSERT INTO machine_downtimes (timestamp,machine_id, machine_name, downtime_start, downtime_end, downtime_type, reason_code, description, shift_id, operator_id, site, area, line)
SELECT
(json_data->>'timestamp'):: timestamptz AS timestamp,
(json_data->>'machine_id'):: text AS machine_id,
(json_data->>'machine_name'):: text AS machine_name,
(json_data->>'downtime_start'):: timestamp AS downtime_start,
(json_data->>'downtime_end'):: timestamp AS downtime_end,
(json_data->>'downtime_type'):: text AS downtime_type,
(json_data->>'reason_code'):: text AS reason_code,
(json_data->>'description'):: text AS description,
(json_data->>'shift_id'):: text AS shift_id,
(json_data->>'operator_id'):: text AS operator_id,
(json_data->>'site'):: text AS site,
(json_data->>'area'):: text AS area,
(json_data->>'line'):: text AS line
FROM (
VALUES
(${mqtt-payload-utf8}::jsonb)
) AS input(json_data);
insert_downtime_data.sql
INSERT INTO production_cycles (timestamp, machine_id, machine_name, part_id, start_time, end_time, ideal_cycle_time_seconds, units, shift_id, operator_id, temperature_c, energy_consumption_kwh, site, area, line)
SELECT
(json_data->>'timestamp'):: timestamptz AS timestamp,
(json_data->>'machine_id'):: text AS machine_id,
(json_data->>'machine_name'):: text AS machine_name,
(json_data->>'part_id'):: text AS part_id,
(json_data->>'start_time'):: text AS start_time,
(json_data->>'end_time'):: text AS end_time,
(json_data->>'ideal_cycle_time_seconds'):: int AS ideal_cycle_time_seconds,
(json_data->>'units'):: int AS units,
(json_data->>'shift_id'):: text AS shift_id,
(json_data->>'operator_id'):: text AS operator_id,
(json_data->>'temperature_c'):: decimal AS temperature_c,
(json_data->>'energy_consumption_kwh'):: decimal AS energy_consumption_kwh,
(json_data->>'site'):: text AS site,
(json_data->>'area'):: text AS area,
(json_data->>'line'):: text AS line
FROM (
VALUES
(${mqtt-payload-utf8}::jsonb)
) AS input(json_data);
insert_cycle_data.sql
INSERT INTO production_defects (timestamp, machine_id, machine_name, defect_test_time, category, defect_type, quantity, description, detection_method, severity, status, resolved_by, resolution_action, resolution_time, shift_id, operator_id, site, area, line)
SELECT
(json_data->>'timestamp'):: timestamptz AS timestamp,
(json_data->>'machine_id'):: text AS machine_id,
(json_data->>'machine_name'):: text AS machine_name,
(json_data->>'defect_test_time'):: text AS defect_test_time,
(json_data->>'category'):: text AS category,
(json_data->>'defect_type'):: text AS defect_type,
(json_data->>'quantity'):: int AS quantity,
(json_data->>'description'):: text AS description,
(json_data->>'detection_method'):: text AS detection_method,
(json_data->>'severity'):: text AS severity,
(json_data->>'status'):: text AS status,
(json_data->>'resolved_by'):: text AS resolved_by,
(json_data->>'resolution_action'):: text AS resolution_action,
(json_data->>'resolution_time'):: text AS resolution_time,
(json_data->>'shift_id'):: text AS shift_id,
(json_data->>'operator_id'):: text AS operator_id,
(json_data->>'site'):: text AS site,
(json_data->>'area'):: text AS area,
(json_data->>'line'):: text AS line
FROM (
VALUES
(${mqtt-payload-utf8}::jsonb)
) AS input(json_data);
insert_defect_data.sql
To activate this extension, simply remove the DISABLED file in the extension directory. Once done, head over to the broker’s admin interface—you should see the PostgreSQL extension listed as active, and your data will start streaming into TimescaleDB.
Verifying the Data in TimescaleDB
Finally, return to TimescaleDB and run a query on your tables to confirm that data is flowing in. You should see new records in the machine_downtimes, production_cycles, and production_defects tables.
From here, you can connect your favorite data visualization or analytics tool to TimescaleDB to explore and gain insights from this streaming machine data.
Watch the Video
Chapters

Kudzai Manditereza
Kudzai is a tech influencer and electronic engineer based in Germany. As a Sr. Industry Solutions Advocate at HiveMQ, he helps developers and architects adopt MQTT and HiveMQ for their IIoT projects. Kudzai runs a popular YouTube channel focused on IIoT and Smart Manufacturing technologies and he has been recognized as one of the Top 100 global influencers talking about Industry 4.0 online.