Last active
October 6, 2024 22:14
-
-
Save nginx-gists/0e93fe7813ec131fed8329d10ead70ea to your computer and use it in GitHub Desktop.
NGINX Plus for the IoT: Load Balancing MQTT
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Pull base image. The official docker openjdk-8 image is used here. | |
FROM java:8-jdk | |
# Copy HiveMQ to container | |
COPY hivemq.zip /tmp/ | |
#Install wget and unzip, then download and install HiveMQ. | |
RUN \ | |
apt-get install -y wget unzip &&\ | |
unzip /tmp/hivemq.zip -d /opt/ &&\ | |
mv /opt/hivemq-* /opt/hivemq | |
# Define working directory. | |
WORKDIR /opt/hivemq | |
# Define HIVEMQ_HOME variable | |
ENV HIVEMQ_HOME /opt/hivemq | |
# Expose MQTT port | |
EXPOSE 1883 | |
# Define default command. Here we use HiveMQ's run script. | |
CMD ["/opt/hivemq/bin/run.sh"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var client_messages = 1; | |
var client_id_str = "-"; | |
function getClientId(s) { | |
s.on('upload', function (data, flags) { | |
if ( data.length == 0 ) { // Initial calls may contain no data, so | |
s.log("No buffer yet"); // ask that we get called again | |
//s.done(1); // (supposing that code=1 means that) | |
return; | |
} else if ( client_messages == 1 ) { // CONNECT is first packet from the client | |
// CONNECT packet is 1, using upper 4 bits (00010000 to 00011111) | |
var packet_type_flags_byte = data.charCodeAt(0); | |
s.log("MQTT packet type+flags = " + packet_type_flags_byte.toString()); | |
if ( packet_type_flags_byte >= 16 && packet_type_flags_byte < 32 ) { | |
// Calculate remaining length with variable encoding scheme | |
var multiplier = 1; | |
var remaining_len_val = 0; | |
var remaining_len_byte; | |
for (var remaining_len_pos = 1; remaining_len_pos < 5; remaining_len_pos++ ) { | |
remaining_len_byte = data.charCodeAt(remaining_len_pos); | |
if ( remaining_len_byte == 0 ) break; // Stop decoding on 0 | |
remaining_len_val += (remaining_len_byte & 127) * multiplier; | |
multiplier *= 128; | |
} | |
// Extract ClientId based on length defined by 2-byte encoding | |
var payload_offset = remaining_len_pos + 12; // Skip fixed header | |
var client_id_len_msb = data.charCodeAt(payload_offset).toString(16); | |
var client_id_len_lsb = data.charCodeAt(payload_offset + 1).toString(16); | |
if ( client_id_len_lsb.length < 2 ) client_id_len_lsb = "0" + client_id_len_lsb; | |
var client_id_len_int = parseInt(client_id_len_msb + client_id_len_lsb, 16); | |
client_id_str = data.substr(payload_offset + 2, client_id_len_int); | |
s.log("ClientId value = " + client_id_str); | |
} else { | |
s.log("Received unexpected MQTT packet type+flags: " + packet_type_flags_byte.toString()); | |
} | |
} | |
client_messages++; | |
s.allow(); | |
}); | |
} | |
function setClientId(s) { | |
return client_id_str; | |
} | |
export default { getClientId setClientId } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
stream { | |
include stream_conf.d/*.conf; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
log_format mqtt '$remote_addr [$time_local] $protocol $status $bytes_received ' | |
'$bytes_sent $upstream_addr'; | |
upstream hive_mq { | |
server 127.0.0.1:18831; #node1 | |
server 127.0.0.1:18832; #node2 | |
server 127.0.0.1:18833; #node3 | |
zone tcp_mem 64k; | |
} | |
match mqtt_conn { | |
# Send CONNECT packet with client ID "nginx health check" | |
send \x10\x20\x00\x06\x4d\x51\x49\x73\x64\x70\x03\x02\x00\x3c\x00\x12\x6e\x67\x69\x6e\x78\x20\x68\x65\x61\x6c\x74\x68\x20\x63\x68\x65\x63\x6b; | |
expect \x20\x02\x00\x00; # Entire payload of CONNACK packet | |
} | |
server { | |
listen 1883; | |
proxy_pass hive_mq; | |
proxy_connect_timeout 1s; | |
health_check match=mqtt_conn; | |
access_log /var/log/nginx/mqtt_access.log mqtt; | |
error_log /var/log/nginx/mqtt_error.log; # Health check notifications | |
} | |
# vim: syntax=nginx |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
js_import mqtt.js; | |
js_set $mqtt_client_id mqtt.setClientId; | |
log_format mqtt '$remote_addr [$time_local] $protocol $status $bytes_received ' | |
'$bytes_sent $upstream_addr $mqtt_client_id'; # Include MQTT ClientId | |
upstream hive_mq { | |
server 127.0.0.1:18831; #node1 | |
server 127.0.0.1:18832; #node2 | |
server 127.0.0.1:18833; #node3 | |
zone tcp_mem 64k; | |
hash $mqtt_client_id consistent; # Session persistence keyed against ClientId | |
} | |
server { | |
listen 1883; | |
preread_buffer_size 1k; # Big enough to read CONNECT packet header | |
js_preread mqtt.getClientId; # Parse CONNECT packet for ClientId | |
proxy_pass hive_mq; | |
proxy_connect_timeout 1s; | |
access_log /var/log/nginx/mqtt_access.log mqtt; | |
error_log /var/log/nginx/mqtt_error.log info; # NGINX JavaScript debug logging | |
} | |
# vim: syntax=nginx |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For a discussion of these files, see NGINX Plus for the IoT: Load Balancing MQTT