Skip to content

Instantly share code, notes, and snippets.

@nginx-gists
Last active October 6, 2024 22:14
Show Gist options
  • Save nginx-gists/0e93fe7813ec131fed8329d10ead70ea to your computer and use it in GitHub Desktop.
Save nginx-gists/0e93fe7813ec131fed8329d10ead70ea to your computer and use it in GitHub Desktop.
NGINX Plus for the IoT: Load Balancing MQTT
# Pull base image. The official docker openjdk-8 image is used here.
FROM java:8-jdk
# Copy HiveMQ to container
COPY hivemq.zip /tmp/
#Install wget and unzip, then download and install HiveMQ.
RUN \
apt-get install -y wget unzip &&\
unzip /tmp/hivemq.zip -d /opt/ &&\
mv /opt/hivemq-* /opt/hivemq
# Define working directory.
WORKDIR /opt/hivemq
# Define HIVEMQ_HOME variable
ENV HIVEMQ_HOME /opt/hivemq
# Expose MQTT port
EXPOSE 1883
# Define default command. Here we use HiveMQ's run script.
CMD ["/opt/hivemq/bin/run.sh"]
var client_messages = 1;
var client_id_str = "-";
function getClientId(s) {
s.on('upload', function (data, flags) {
if ( data.length == 0 ) { // Initial calls may contain no data, so
s.log("No buffer yet"); // ask that we get called again
//s.done(1); // (supposing that code=1 means that)
return;
} else if ( client_messages == 1 ) { // CONNECT is first packet from the client
// CONNECT packet is 1, using upper 4 bits (00010000 to 00011111)
var packet_type_flags_byte = data.charCodeAt(0);
s.log("MQTT packet type+flags = " + packet_type_flags_byte.toString());
if ( packet_type_flags_byte >= 16 && packet_type_flags_byte < 32 ) {
// Calculate remaining length with variable encoding scheme
var multiplier = 1;
var remaining_len_val = 0;
var remaining_len_byte;
for (var remaining_len_pos = 1; remaining_len_pos < 5; remaining_len_pos++ ) {
remaining_len_byte = data.charCodeAt(remaining_len_pos);
if ( remaining_len_byte == 0 ) break; // Stop decoding on 0
remaining_len_val += (remaining_len_byte & 127) * multiplier;
multiplier *= 128;
}
// Extract ClientId based on length defined by 2-byte encoding
var payload_offset = remaining_len_pos + 12; // Skip fixed header
var client_id_len_msb = data.charCodeAt(payload_offset).toString(16);
var client_id_len_lsb = data.charCodeAt(payload_offset + 1).toString(16);
if ( client_id_len_lsb.length < 2 ) client_id_len_lsb = "0" + client_id_len_lsb;
var client_id_len_int = parseInt(client_id_len_msb + client_id_len_lsb, 16);
client_id_str = data.substr(payload_offset + 2, client_id_len_int);
s.log("ClientId value = " + client_id_str);
} else {
s.log("Received unexpected MQTT packet type+flags: " + packet_type_flags_byte.toString());
}
}
client_messages++;
s.allow();
});
}
function setClientId(s) {
return client_id_str;
}
export default { getClientId setClientId }
stream {
include stream_conf.d/*.conf;
}
log_format mqtt '$remote_addr [$time_local] $protocol $status $bytes_received '
'$bytes_sent $upstream_addr';
upstream hive_mq {
server 127.0.0.1:18831; #node1
server 127.0.0.1:18832; #node2
server 127.0.0.1:18833; #node3
zone tcp_mem 64k;
}
match mqtt_conn {
# Send CONNECT packet with client ID "nginx health check"
send \x10\x20\x00\x06\x4d\x51\x49\x73\x64\x70\x03\x02\x00\x3c\x00\x12\x6e\x67\x69\x6e\x78\x20\x68\x65\x61\x6c\x74\x68\x20\x63\x68\x65\x63\x6b;
expect \x20\x02\x00\x00; # Entire payload of CONNACK packet
}
server {
listen 1883;
proxy_pass hive_mq;
proxy_connect_timeout 1s;
health_check match=mqtt_conn;
access_log /var/log/nginx/mqtt_access.log mqtt;
error_log /var/log/nginx/mqtt_error.log; # Health check notifications
}
# vim: syntax=nginx
js_import mqtt.js;
js_set $mqtt_client_id mqtt.setClientId;
log_format mqtt '$remote_addr [$time_local] $protocol $status $bytes_received '
'$bytes_sent $upstream_addr $mqtt_client_id'; # Include MQTT ClientId
upstream hive_mq {
server 127.0.0.1:18831; #node1
server 127.0.0.1:18832; #node2
server 127.0.0.1:18833; #node3
zone tcp_mem 64k;
hash $mqtt_client_id consistent; # Session persistence keyed against ClientId
}
server {
listen 1883;
preread_buffer_size 1k; # Big enough to read CONNECT packet header
js_preread mqtt.getClientId; # Parse CONNECT packet for ClientId
proxy_pass hive_mq;
proxy_connect_timeout 1s;
access_log /var/log/nginx/mqtt_access.log mqtt;
error_log /var/log/nginx/mqtt_error.log info; # NGINX JavaScript debug logging
}
# vim: syntax=nginx
@nginx-gists
Copy link
Author

For a discussion of these files, see NGINX Plus for the IoT: Load Balancing MQTT

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment