- 2024-05-16 adds case study explaining how to extract values from topic strings, and how to specify the measurement name in the message payload provided to the "influxdb out" node.
- 2023-12-02 revise graphics to correspond with "influxdb out" node, and explain the pros and cons of InfluxDB 1.8 vs InfluxDB 2.
Getting data produced by IoT sensors into a database is practically a mandatory step before effective visualisation (eg dashboards).
This recipe shows you how to get an MQTT payload into an InfluxDB database using three Node-RED nodes. It makes the following assumptions:
- A client device of some kind publishing data to a topic via the MQTT protocol;
- Mosquitto (MQTT broker);
- Node-RED subscribing to the topic; and
- InfluxDB running and accessible to Node-RED.
This recipe also show how to map between JSON keys and database field and tag names.
At the time of writing (December 2023), InfluxData supports both InfluxDB 1.8 and InfluxDB 2.
InfluxDB 3 has been announced but is not yet generally available.
If you are beginning your IoT journey, you may wonder which version you should choose. Absent useful guidance, you may well reason like this:
Version 2.0 is obviously later than 1.8 therefore 2.0 must be "better".
In some ways, InfluxDB 2 can be seen as a natural progression from InfluxDB 1.8. In other ways, the two database systems are like chalk and cheese. From a user perspective, one of the most obvious differences between the two systems is the query language:
- InfluxDB 1.8 uses InfluxQL natively with Flux being an experimental add-on which needs to be enabled;
- InfluxDB 2 uses Flux natively with partial support for InfluxQL.
One consequence of the difference in query language affects Grafana. Its GUI has native point-and-click support for constructing InfluxQL queries that will run against InfluxDB 1.8 but has no equivalent support for constructing Flux queries that run against InfluxDB 2.
There is a workaround. It involves using the InfluxDB 2 web GUI to construct queries in Flux, which you then copy and paste into Grafana. This definitely works but the lack of native support for Flux in Grafana means that, if anything goes wrong, you often have to start from scratch.
This workaround may seem like an acceptable compromise but, eventually, you will encounter situations where you will want to manipulate the data in your databases. Common examples are:
- deleting junk that crept in when a sensor went berserk; and
- reformatting, splitting or merging tables to better reflect how your sensors operate and/or to fix poor design decisions (we all make those).
To manipulate your data, you will need to use the query language.
If you are familiar with SQL then you will likely be comfortable with InfluxQL. While InfluxQL is not the same as SQL and does have its own peculiarities, it has sufficient syntactic similarities with SQL that you can usually figure out how to make it do what you need it to do. Plus, if you get stuck, you can generally find a close example by Googling and go from there.
Flux, on the other hand, is its own language which … how can I put this politely … defies description. In my experience the official documentation is quite short on useful examples and Googling mostly fares no better.
To put this problem more succinctly: you are far more likely to be able to get help with InfluxQL than you are with Flux. Whether that matters is something only you will know.
The last point is that InfluxData has announced InfluxDB 3. The announcement includes:
-
This statement which appears in a popup overlay in the Flux documentation:
The future of Flux
Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.
-
This statement which appears in the InfluxDB Clustered documentation:
… brings with it native SQL support and improved InfluxQL performance.
I take those two statements to mean that Flux is a technological dead-end and, because of its dependence on Flux, so too is InfluxDB 2.
If you have a good reason for preferring InfluxDB 2 then it's your system and your rules. But please don't say you weren't warned.
This remainder of this gist focuses on InfluxDB 1.8 exclusively.
Given this MQTT message structure:
topic: /site/topic
message: {"b": true, "i": 123, "r": 456.78, "s": "hello world", "t": "tagValue"}
the task is to configure the Node-RED instance to:
- Listen for the topic "/site/topic";
- Parse the JSON-format message payload; and
- Add a row to a measurement (table) named "example" in an InfluxDB database named "test".
This recipe also demonstrates how to use abbreviated keys in a JSON message and map those to more meaningful field names (attributes) in the InfluxDB database:
JSON Key | Influx Field | Influx Tag | Expected Value | Implied type |
---|---|---|---|---|
b | flag | true | Boolean | |
i | discrete | 123 | Integer | |
r | continuous | 456.78 | Real | |
s | message | "hello world" | String | |
t | identity | "tagValue" | String |
There is nothing magical about either these JSON keys, or the InfluxDB field or tag names. JSON keys do not have to be single characters, and InfluxDB names do not have to be long, meaningful or descriptive. It is just that, in general, IoT devices are memory-constrained so short strings are to be preferred over longer strings; while "meaningful and descriptive" are considered best practice in a database environment.
There is also nothing magical about the choice of these implied data types. Booleans, numbers (both integers and reals) and strings are just what you usually need to pass between a client device and a database.
During testing you will find it useful to have a computer with an MQTT client installed. Explaining the full how-to of this is beyond the scope of this recipe so you should start at mosquitto.org/download.
On Linux, it is not immediately obvious that you need either or both of the following:
$ sudo apt install mosquitto-clients
$ sudo apt install mosquitto
The first command installs the MQTT publishing and subscribing command-line clients. You will need this package to get the mosquitto_pub command.
The second command installs the MQTT broker. You will only need this if you do not have another MQTT broker running somewhere (eg in a Docker container).
If your copy of InfluxDB is running inside a Docker container, consider adding this alias statement to your .profile or .bashrc:
$ alias influx='docker exec -it influxdb influx -precision=rfc3339'
That alias allows you to connect to the "influx" command line interface (CLI) simply by typing:
$ influx
By default, "influx" displays time as nanoseconds since 1970-01-01 UTC. The
-precision=rfc3339
argument displays time in human-readable form.
The "test" database must be created by hand. If you omit this step you will get an error from Node-RED. You can initialise the database like this:
$ influx
> CREATE DATABASE test
> quit
- Launch your browser and connect to your Node-RED server.
- Use the main menu (three horizontal bars "≡" at the top, right of the Node-RED window) to open the Palette Manager:
- select the "Nodes" tab and check whether
node-red-contrib-influxdb
is already installed. If it is not installed, - select the "Install" tab, then search for and install node-red-contrib-influxdb. If you prefer to install contributions from the command line, do that.
- select the "Nodes" tab and check whether
Back in the Node-RED main window, click the "+" button to add a new empty flow. The default title will be something like "Flow 1". Double-click the title and rename the flow with something more descriptive, like "Influx Test". The actual name is not important.
Drag an "mqtt in" node onto the canvas. Double-click to open and configure as follows:
- Select the option to add a new mqtt-broker.
- Click the pencil icon to open the server properties panel.
- Give the server a meaningful name (eg "Docker MQTT").
- Supply the network path to the host running Mosquitto:
- In a Docker environment, this will be the name of the container running Mosquitto (eg "mosquitto").
- In a non-Docker environment where Node-RED and Mosquitto are running on the same host, this will be the loopback address 127.0.0.1.
- If Node-RED and Mosquitto are running on different hosts then this will be a static IP address or the fully-qualified domain name of the host running Mosquitto.
- Click "Add".
- Enter the topic string ("/site/topic").
- Set the "Output" popup to "a parsed JSON object".
- Enter a name for the node. This appears in the schematic and it is good practice to repeat the topic string.
- Click "Done" to complete the node setup.
All other fields can either be left at their default settings or changed to suit your requirements.
The purpose of this node is to:
- Listen to MQTT messages directed to "/site/topic"; and
- Convert the JSON string in the MQTT message body to a JavaScript object representation.
In other words, given an input of the JSON string specified in the task goal, the output from the node will result in msg.payload
containing:
{
"b": true,
"i": 123,
"r": 456.78,
"s": "hello world",
"t": "tagValue"
}
One common mistake is skipping step 7 above. Please go back and double-check that you have set the "Output" popup to "a parsed JSON object".
Drag a "change" node onto the canvas.
Connect the outlet of the "mqtt in" node to the inlet of the "change" node.
Double-click the "change" node to open and configure as follows:
-
Enter a name for the node. This appears in the schematic and it is good practice to summarise the purpose of the node.
-
A new "change" node contains a single rule to "Set msg.payload" but where the data type of the "to" field defaults to a string. Change the popup menu to a Java expression.
-
Click the ellipsis ("…") to open the expression editor.
-
Copy the expression below and paste it into this window.
[ { "flag": msg.payload.b, "discrete": msg.payload.i, "continuous": msg.payload.r, "message": msg.payload.s },{ "identity": msg.payload.t } ]
-
Click "Done".
-
Click "Done" to complete the node setup.
The purpose of this node is to provide a cross-walk between the JSON keys ("b", "i", "r", "s" and "t"), and the field and tag names you need in the InfluxDB database. The basic pattern is:
[
{
"fieldName" : msg.payload.key,
...
},{
"tagName" : msg.payload.key,
...
}
]
If you only want to pass fields, then omit the square brackets and the elements that describe the tag(s), like this:
{
"fieldName" : msg.payload.key,
...
}
Note that it is feasible to omit this "change" node entirely. If you do that the JSON keys in the Node-RED "payload" variable will become the field names in the database. Before you take that shortcut, consider:
-
If an MQTT client starts providing unexpected JSON keys, those can easily pollute your InfluxDB database. Using a cross-walk between the expected JSON keys and the field and tag names you want in your database provides an effective barrier against unexpected keys.
-
You may wish to include keys in your MQTT payload that you do not want to wind up in your database. Good examples are millis() uptime, free heap and version numbers. Such values are usually ephemeral and only of interest at the moment when they are produced (and might only be produced when you are actively debugging your MQTT client). You can always see such values by subscribing to the MQTT feed or attaching a "debug" node to the "mqtt in" node.
-
A "change" node simplifies the process of adding new tags and fields. You can:
- Add a new key+value pair to the JSON payload being produced by your MQTT client, then
- Attach a "debug" node to the "mqtt in" node to confirm that you are receiving the expected data, then
- Change the cross-walk when you are ready to start importing the data into your database.
-
If you want to include both tags and fields in your database, you really only have two options, either:
- your MQTT client has to format the JSON payload correctly before transmission, or
- you need a "change" node to implement a cross-walk.
-
Opting to do the work in your MQTT client effectively rules out the tactical use of ephemeral values and a step-wise refinement approach to development if you need to add new fields.
Given the output from the "mqtt in" node, the Javascript expression in the "change" node will result in msg.payload
having the value:
[
{
"flag": true,
"discrete": 123,
"continuous": 456.78,
"message": "hello world"
},{
"identity": "tagValue"
}
]
Drag an "influxdb out" node onto the canvas.
Can't find the "influxdb out" node in the palette? Double-check that you installed "node-red-contrib-influxdb" as described above.
Connect the outlet of the "change" node to the inlet of the "influxdb out" node.
Double-click the "influxdb out" node to open and configure as follows:
- Enter a name for the node. This appears in the schematic. It is good practice to summarise the purpose of the node.
- From the "Server" popup menu, choose "Add new influxdb...".
- Click the pencil icon to open the server properties panel.
- I recommend leaving the "Name" field blank. If you do then the "Server" field in the previous panel will take on the "host:port/database" appearance shown at the end of the dotted line in the figure. You lose that valuable synopsis by supplying a name in this field.
- Set the Version popup to "1.x" (this gist does not cover InfluxDB 2; please see Some words about InfluxDB if you want to understand why).
- Supply the network path to the host running InfluxDB:
- In a Docker environment, this will be the name of the container running InfluxDB (eg "influxdb").
- In a non-Docker environment where Node-RED and InfluxDB are running on the same host, this will be the loopback address 127.0.0.1.
- If Node-RED and InfluxDB are running on different hosts then this will be a static IP address or the fully-qualified domain name of the host running InfluxDB.
- The name of the InfluxDB database. This needs to be the same as you created earlier ("CREATE DATABASE test").
- Click "Add".
- Supply the name of the measurement you want to write to. This is analogous to a "table" in SQL parlance. The recommended name for this tutorial is "example".
- Click "Done" to complete the node setup.
All other fields can be left at their default settings or changed to suit your requirements.
Warning: InfluxDB database connections are global to Node-RED. Suppose you have an existing flow connected to the "test" database. When you create a new flow for a new database, it is very tempting to copy the "influxdb out" node from the old flow, paste it into the new flow, open it, click the pencil icon, and just change the database name. If you do that, you will break your old flow because it will refer to the new database. Always start from scratch by dragging a new "influxdb out" node onto the canvas.
Given the output from the "change" node, the practical effect of this node is:
$ influx
> USE test
> INSERT example,identity=tagValue flag=true,discrete=123,continuous=456.78,message="hello world"
> quit
Click the Deploy button near the top, right of the canvas.
Add two "debug" nodes to the canvas. Double-click each in turn and set its Output to "complete msg object". Connect the outlet of the "mqtt in" node to the first "debug" node, and the outlet of the "change" node to the second "debug" node. The final result should look something like this:
Select the Debug panel (the controls below "Deploy").
Click "Deploy" to activate. Any errors will show up in the Debug panel.
Copy the following text to the clipboard then paste it into a text editor.
$ mosquitto_pub -h host -t '/site/topic' -m '{"b": true, "i": 123, "r": 456.78, "s": "hello world", "t": "tagValue"}'
Edit the "host" field to point to the server running your Mosquitto broker. This might be an IP address or a fully-qualified domain name.
Paste the text into a Terminal window on your client device and press return.
If all goes well, you will get two debug messages from Node-RED. The first is from the "mqtt in" node confirming receipt of the JSON payload:
▿ object
topic: "/site/topic"
▿ payload: object
b: true
i: 123
r: 456.78
s: "hello world"
t: "tagValue"
and the second is from the "change" node showing the effect of the cross-walk:
▿ object
topic: "/site/topic"
▿ payload: array[2]
▿0: object
flag: true
discrete: 123
continuous: 456.78
message: "hello world"
▿1: object
identity: "tagValue"
To confirm that the data made it all the way to the InfluxDB database:
$ influx
> USE test
> show measurements
name: measurements
name
----
example
> show series
key
---
example,identity=tagValue
> show tag keys
name: example
tagKey
------
identity
> show field keys
name: example
fieldKey fieldType
-------- ---------
continuous float
discrete float
flag boolean
message string
> select * from example
name: example
time continuous discrete flag identity message
---- ---------- -------- ---- -------- -------
2020-02-12T03:56:07.844235334Z 456.78 123 true tagValue hello world
> quit
You can either delete or deactivate the "debug" nodes in the Node-RED flow.
When you no longer need the test database, you can remove it like this:
$ influx
> DROP DATABASE test
> quit
Small case studies can be a useful way of exploring some slightly more advanced techniques which you may wish to use.
Suppose you are the owner of a (fictional) "WeatherPro" weather station. The device reports its measurements like this:
topic: weatherpro/F63617A7/temperature
message: {"observation": 27.3, "units": "C"}
topic: weatherpro/F63617A7/humidity
message: {"observation": 85.6, "units": "%"}
In each topic string:
weatherpro
represents the name the manufacturer gave to the device;F63617A7
is the WeatherPro's serial number; andtemperature
andhumidity
indicate the kind of data that is contained in the message payload.
Within each message payload, the value of the measurement (temperature or humidity) is reported against the "observation" key word with "units" as metadata.
Key point:
- The
F63617A7
serial number is not repeated in the message payload. I am going to use this to show you how to extract values from the topic string.
This case study will focus on the "temperature" topic but the same principles apply to the "humidity" topic, which is left as a homework exercise.
Creating a three-node flow for the temperature topic involves:
-
Drag an "mqtt in" node onto the canvas and open it for editing.
-
Configure your broker. Refer to 1. "mqtt in" node for the details on how to do that.
-
We could subscribe to the topic:
weatherpro/F63617A7/temperature
However, is preferable to future-proof the design to some extent by using a wildcard in the serial number position:
weatherpro/+/temperature
In MQTT parlance, a:
+
substitutes for exactly one position in the topic string; while#
substitutes for multiple positions. with the proviso that it must occur at the end of a topic string.
-
Set the Output to "a parsed JSON object".
-
Click "Done".
-
Drag a "change" node onto the canvas and open it for editing.
-
Give it a meaningful title. Examples:
- "prepare observation"
- "prepare temperature insert"
The actual name is not important. It just needs to be meaningful to you.
-
The default rule is "Set msg.payload to an empty string". Replace "payload" with "measurement" (all lower case). The reason for this will become clear in a while.
-
The default type of the "to the value" popup is az (indicating a simple string). A string is what you want for this rule so leave it alone.
-
Type the name "temperature". This will become the measurement name in your InfluxDB database.
-
At the bottom, left of the "Edit change node" panel is a "+ add" button. Click it twice. This adds two more default rules.
-
Replace "payload" with "device" (all lower case).
-
From the "to the value" popup menu, choose "Java expression".
-
Enter the following into the expression field:
$split(msg.topic,'/')[1]
This decomposes to:
-
$split(msg.topic,'/')
which means "find all the/
characters in the topic string and use those as the separators to convert the topic string to an array":- Index position 0 =
weatherpro
- Index position 1 =
F63617A7
- Index position 2 =
temperature
- Index position 0 =
-
[1]
means "select the value at index position 1" and return it as the result of the expression. Index position 1 is the serial number component of the topic string.
Note:
-
If your topic string starts with a
/
, it implies that the first element in the array is a null string. For example, using the$split()
function on this topic string:/weatherpro/F63617A7/temperature
would yield:
- Index position 0 = a null string
- Index position 1 =
weatherpro
- Index position 2 =
F63617A7
- Index position 3 =
temperature
in which case the expression needed to select the serial number would be:
$split(msg.topic,'/')[2]
-
-
We are about to prepare the measurement for insertion into InfluxDB. In this case, the default receiver of
msg.payload
is correct for this rule. -
From the "to the value" popup menu, choose "Java expression", and then click the ellipsis (...) button at the end of the field to open the "JSONata Expression editor".
-
Enter the following cross-walk expression into the editor window:
[ { "temp" : msg.payload.observation },{ "device" : msg.device } ]
The InfluxDB insert pattern was explained earlier. In this case:
- the field named "temp" is being populated from the value associated with the "observation" keyword in the MQTT payload; while
- the tag named "device" is being populated from the
msg.device
property, which was set by the second rule in the "change" node.
Notes:
-
Mapping
observation
totemp
demonstrates the ability of a cross-walk to let you use field or tag names in your database which are not present in the original MQTT messages; and -
Ignoring
msg.payload.units
demonstrates the ability of a cross-walk to discard non-useful data.What constitutes useful data depends on your needs and perspectives so your milage may vary when it comes to discarding temperature units. That said, units are metadata so, if you're going to retain them, you should use a tag rather than a field.
-
Click "Done".
-
Click "Done".
-
Drag an "influxdb out" node onto the canvas and open it for editing.
-
Give the node a meaningful name.
-
Either choose an existing database connection from the popup or click the pencil icon and create a new connection. Refer to 3. "influxdb out" node for the details on how to do that. In this case, note that the database name is "myweather" and recall that it is necessary to create this database before use via the Influx command line:
> CREATE DATABASE myweather
See InfluxDB for a step-by-step example.
-
Leave the measurement field empty. It is being set in the "change" node (steps 8, 9 and 10).
-
I've checked "Advanced Query Options" in the screen shot to make the point that it is possible to set the retention policy here. The alternative is to add another rule to the "change" node and use it to set a value for
msg.retentionPolicy
(the name is case sensitive). Not specifying a retention policy at all results in the default policy for the database being applied (typicallyautogen
) and that this is usually what you want. -
Click "Done".
-
Wire-up the three nodes as shown and click "Deploy".
Notes:
- Setting
msg.measurement
(steps 8, 9 and 10) while leaving the Measurement field (23) empty means you can use a single "influxdb out" node for a group of related topics (egweatherpro/F63617A7/humidity
). - The word "device" at step 12 and its use on both sides of the cross-walk expression (17) is not significant. You could also use "station" or something equally meaningful. It should be self-evident that you need to avoid keywords that are already in use (eg
payload
,topic
,measurement
,retentionPolicy
). - It may seem tempting to use the
$split()
function to extracttemperature
andhumidity
from the topic string but, in practice, the cross-walk expression (17) will almost always be topic-specific so this is rarely useful in practice.
Run the following commands:
$ mosquitto_pub -h mqtt.your.domain.com -t 'weatherpro/F63617A7/temperature' -m '{"observation": 27.3, "units": "C"}'
mqtt.your.domain.com
could also be the host name or IP address of your broker (-h
defaults tolocalhost
and can be omitted if your broker is running on the same host).
A "debug" node attached to the "mqtt in" node reports:
topic: "weatherpro/F63617A7/temperature"
payload: object
observation: 27.3
units: "C"
qos: 0
retain: false
A "debug" node attached to the cross-walk ("Change" node) reports:
topic: "weatherpro/F63617A7/temperature"
payload: array[2]
0: object
temp: 27.3
1: object
device: "F63617A7"
qos: 0
retain: false
measurement: "temperature"
device: "F63617A7"
Note:
F63617A7
in thepayload
field was copied from thedevice
field, which was first extracted from thetopic
field by the$split()
function.
Exploration within the influx
command line interface reveals:
$ influx
> USE myweather
> SHOW measurements
name: measurements
name
----
temperature
> SHOW series
key
---
temperature,device=F63617A7
> SELECT * FROM temperature tz('Australia/Sydney')
name: temperature
time device temp
---- ------ ----
2024-05-16T15:54:04.2839275+10:00 F63617A7 27.3
> exit
$
Now, extend the basic three-node flow to subscribe to the "humidity" topic and write its observations to the humidity
measurement in the same database.
Hint:
- You should only need two more nodes, not three.
You can test your solution using:
$ mosquitto_pub -h mqtt.your.domain.com -t 'weatherpro/F63617A7/humidity' -m '{"observation": 85.6, "units": "%"}'
Once you have finished with the database:
$ influx
> DROP DATABASE myweather
> quit
The comments spanned many years and were getting very unwieldy. I've incorporated most of what was raised in questions in updates to the gist so I thought it was better to start from a clean slate. Call it the gist author's prerogative.