Skip to main content

EMQX Platform

MQTT is a popular IoT data transmission protocol, and EMQX is an open-source MQTT Broker software. Without any coding, you can directly write MQTT data into TDengine by simply configuring "rules" in the EMQX Dashboard. EMQX supports saving data to TDengine by sending it to a web service and also provides a native TDengine driver in the enterprise version for direct saving.

Prerequisites

To enable EMQX to properly add a TDengine data source, the following preparations are needed:

  • TDengine cluster is deployed and running normally
  • taosAdapter is installed and running normally. For details, please refer to taosAdapter User Manual
  • If using the simulation writing program mentioned later, install the appropriate version of Node.js, version 12 recommended

Install and Start EMQX

Users can download the installation package from the EMQX official website according to their operating system and execute the installation. After installation, start the EMQX service using sudo emqx start or sudo systemctl start emqx.

Note: This article is based on EMQX v4.4.5. Other versions may differ in configuration interface, configuration methods, and features as the version upgrades.

Create Database and Table

Create the corresponding database and table structure in TDengine to receive MQTT data. Enter the TDengine CLI and copy and execute the following SQL statement:

CREATE DATABASE test;
USE test;
CREATE TABLE sensor_data (ts TIMESTAMP, temperature FLOAT, humidity FLOAT, volume FLOAT, pm10 FLOAT, pm25 FLOAT, so2 FLOAT, no2 FLOAT, co FLOAT, sensor_id NCHAR(255), area TINYINT, coll_time TIMESTAMP);

Configure EMQX Rules

Since the configuration interface differs across EMQX versions, this section is only an example for v4.4.5. For other versions, please refer to the respective official documentation.

Log in to EMQX Dashboard

Open the URL http://IP:18083 in a browser and log in to the EMQX Dashboard. The initial username is admin and the password is: public.

Create a Rule (Rule)

Select "Rule Engine (Rule Engine)" on the left, then "Rule (Rule)" and click the "Create (Create)" button:

Edit SQL Field

Copy the following content into the SQL edit box:

SELECT
payload
FROM
"sensor/data"

Where payload represents the entire message body, sensor/data is the message topic selected for this rule.

Add "Action Handler (action handler)"

Add "Resource (Resource)"

Select "Send Data to Web Service" and click the "Create Resource" button:

Edit "Resource"

Select "WebHook" and fill in the "Request URL" with the address provided by taosAdapter for REST services. If taosadapter is started locally, the default address is http://127.0.0.1:6041/rest/sql.

Please keep other properties at their default values.

Edit "Action"

Edit the resource configuration, adding an Authorization key/value pair. The default username and password corresponding Authorization value is:

Basic cm9vdDp0YW9zZGF0YQ==

For related documentation, please refer to TDengine REST API Documentation.

Enter the rule engine replacement template in the message body:

INSERT INTO test.sensor_data VALUES(
now,
${payload.temperature},
${payload.humidity},
${payload.volume},
${payload.PM10},
${payload.pm25},
${payload.SO2},
${payload.NO2},
${payload.CO},
'${payload.id}',
${payload.area},
${payload.ts}
)

Finally, click the "Create" button at the bottom left to save the rule.

Write a Mock Test Program

// mock.js
const mqtt = require('mqtt')
const Mock = require('mockjs')
const EMQX_SERVER = 'mqtt://localhost:1883'
const CLIENT_NUM = 10
const STEP = 5000 // Data interval in ms
const AWAIT = 5000 // Sleep time after data be written once to avoid data writing too fast
const CLIENT_POOL = []
startMock()
function sleep(timer = 100) {
return new Promise(resolve => {
setTimeout(resolve, timer)
})
}
async function startMock() {
const now = Date.now()
for (let i = 0; i < CLIENT_NUM; i++) {
const client = await createClient(`mock_client_${i}`)
CLIENT_POOL.push(client)
}
// last 24h every 5s
const last = 24 * 3600 * 1000
for (let ts = now - last; ts <= now; ts += STEP) {
for (const client of CLIENT_POOL) {
const mockData = generateMockData()
const data = {
...mockData,
id: client.clientId,
area: 0,
ts,
}
client.publish('sensor/data', JSON.stringify(data))
}
const dateStr = new Date(ts).toLocaleTimeString()
console.log(`${dateStr} send success.`)
await sleep(AWAIT)
}
console.log(`Done, use ${(Date.now() - now) / 1000}s`)
}
/**
* Init a virtual mqtt client
* @param {string} clientId ClientID
*/
function createClient(clientId) {
return new Promise((resolve, reject) => {
const client = mqtt.connect(EMQX_SERVER, {
clientId,
})
client.on('connect', () => {
console.log(`client ${clientId} connected`)
resolve(client)
})
client.on('reconnect', () => {
console.log('reconnect')
})
client.on('error', (e) => {
console.error(e)
reject(e)
})
})
}
/**
* Generate mock data
*/
function generateMockData() {
return {
"temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),
"humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),
"volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),
"PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
"pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
"SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"area": Mock.Random.integer(0, 20),
"ts": 1596157444170,
}
}

view source code

Note: In the code, CLIENT_NUM can be set to a smaller value at the start of the test to avoid hardware performance not being able to fully handle a large number of concurrent clients.

Execute Test Simulation Sending MQTT Data

npm install mqtt mockjs --save --registry=https://registry.npm.taobao.org
node mock.js

Verify EMQX Received Data

Refresh the EMQX Dashboard rule engine interface to see how many records were correctly received:

Verify Data Written to TDengine

Use the TDengine CLI program to log in and query the relevant database and table to verify that the data has been correctly written to TDengine:

For detailed usage of EMQX, please refer to EMQX Official Documentation.