MQTT Data Subscription
In addition to classic data subscription, TDengine supports subscription over MQTT. You can create a broker node (bnode) in TDengine and connect your MQTT client to it. The client can then subscribe to TDengine topics.
Key Features:
- Protocol Support: MQTT 5.0
- Authentication: Uses TDengine native authentication
- Topic Management: Unlike the standard MQTT protocol, topics must be pre-created (as message publishing is not supported, topics cannot be dynamically created via message publishing)
- Shared Topics: Topics in the format $shared/group_id/topic_name are treated as shared subscriptions, suitable for scenarios requiring load balancing and high availability
- Subscription Position: Supports latest and earliest (WAL earliest position)
- Quality of Service: Supports QoS 0 and QoS 1
Bnode Management
You manage bnodes through the taos CLI.
Create a Bnode
Use the following SQL statement to create a bnode:
CREATE BNODE ON DNODE <dnode_id>;
You can create only one bnode on each dnode. Once the bnode is successfully created, a bnode subprocess named taosmqtt
is automatically started to provide MQTT subscription services.
The taosmqtt
service uses port 6083 by default. You can modify the mqttPort
parameter in taos.cfg
to provide MQTT subscription services on a different port.
View Bnodes
Use the following SQL statement to display information about the bnodes in your cluster:
SHOW BNODES;
Information similar to the following is displayed:
taos> show bnodes;
id | endpoint | protocol | create_time |
======================================================================
1 | 192.168.0.1:6083 | mqtt | 2024-11-28 18:44:27.089 |
Query OK, 1 row(s) in set (0.037205s)
Delete a Bnode
Use the following SQL statement to delete a bnode:
DROP BNODE ON DNODE <dnode_id>;
Deleting a bnode removes it from the TDengine cluster and stops the taosmqtt
service.
MQTT Data Subscription Example
This example creates test data in TDengine and subscribes to the data. You can use any client that supports the MQTT v5.0 protocol to subscribe; this example uses the Python paho-mqtt
library.
Create Test Data
In the taos CLI, run the following SQL statements to set up your environment for this example.
CREATE DATABASE db VGROUPS 1;
CREATE TABLE db.meters (ts TIMESTAMP, f1 INT) TAGS (t1 INT);
CREATE TOPIC topic_meters AS SELECT ts, tbname, f1, t1 FROM db.meters;
INSERT INTO db.tb USING db.meters TAGS (1) VALUES (now, 1);
CREATE BNODE ON DNODE 1;
This creates a database, a supertable, and a topic. It then inserts a test data point and creates a bnode.
Create a Consumer
Write a consumer in Python as follows:
import time
import paho.mqtt
import paho.mqtt.properties as p
import paho.mqtt.packettypes as pt
import paho.mqtt.client as mqttClient
def on_connect(client, userdata, flags, rc, properties=None):
print("CONNACK received with code %s." % rc)
sub_properties = p.Properties(pt.PacketTypes.SUBSCRIBE)
sub_properties.UserProperty = ('sub-offset', 'earliest')
client.subscribe("$share/g1/topic_meters", qos=1, properties=sub_properties)
def on_subscribe(client, userdata, mid, granted_qos, properties=None):
print("Subscribed: " + str(mid) + " " + str(granted_qos))
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
if paho.mqtt.__version__[0] > '1':
client = mqttClient.Client(mqttClient.CallbackAPIVersion.VERSION2, client_id="tmq_sub_cid", userdata=None, protocol=mqttClient.MQTTv5)
else:
client = mqttClient.Client(client_id="tmq_sub_cid", userdata=None, protocol=mqttClient.MQTTv5)
client.on_connect = on_connect
client.username_pw_set("root", "taosdata")
client.connect("127.0.1.1", 6083)
client.on_subscribe = on_subscribe
client.on_message = on_message
client.loop_forever()
Save this program as sub.py
.
Subscribe to the Topic
Run the following commands to subscribe to the data in topic_meters
:
python3 -m venv .test-env
source .test-env/bin/activate
pip3 install paho-mqtt==2.1.0
python3 ./sub.py
Once the subscription is successful, any new data written to the topic_meters
topic will be automatically pushed to the client via MQTT.
Message Format
With the example in the previous section, the following information shall be outputted:
CONNACK received with code Success.
Subscribed: 1 [ReasonCode(Suback, 'Granted QoS 1')]
topic_meters 1 b'{"topic":"topic_meters","db":"db","vid":2,"rows":[{"ts":1753086482326,"tbname":"tb","f1":1,"t1":1}]}'
In the third line, topic_meters
, is the topic we subscribed to. 1 is the QoS value for this message, followed by a JSON message encoded in UTF-8, where rows
is an array of data rows.