Skip to content

Instantly share code, notes, and snippets.

@iRevive
Last active March 21, 2023 12:52
Show Gist options
  • Save iRevive/a441c7f880bba10ca9a3bb7ee686d222 to your computer and use it in GitHub Desktop.
Save iRevive/a441c7f880bba10ca9a3bb7ee686d222 to your computer and use it in GitHub Desktop.
MQTT 5 Connect packet parser for Nginx
function decodeConnectPacket(data) {
// Connect packet is 1, using upper 4 bits (00010000 to 00011111)
const packetTypeFlagsByte = data.charCodeAt(0);
const packet = {
type: packetTypeFlagsByte
};
function getString(start, size) {
return data.substring(start, start + size);
}
function getShort(start) {
const msb = data.charCodeAt(start);
const lsb = data.charCodeAt(start + 1);
return lsb + (msb << 8);
}
if (packetTypeFlagsByte >= 16 && packetTypeFlagsByte < 32) {
// Calculate remaining length with variable encoding scheme
let remainingLenPos;
let multiplier = 1;
let remainingLenValue = 0;
let remainingLenByte;
for (remainingLenPos = 1; remainingLenPos < 5; remainingLenPos++) {
remainingLenByte = data.charCodeAt(remainingLenPos);
if (remainingLenByte === 0) break; // Stop decoding on 0
remainingLenValue += (remainingLenByte & 127) * multiplier;
multiplier *= 128;
}
const variableHeaderStart = remainingLenPos;
packet.protocolType = getString(variableHeaderStart + 2, 4)
packet.protocolVersion = data.charCodeAt(variableHeaderStart + 6).toString(16)
packet.connectFlags = data.charCodeAt(variableHeaderStart + 7).toString(2) // username, password, will retain, [will qos, will qos], will flag, clean start, reserved
packet.keepAlive = getShort(variableHeaderStart + 8);
const propertiesLength = data.charCodeAt(variableHeaderStart + 10);
// todo: read properties
// skip variable header: +10 = the number of previously read bytes
const payloadOffset = variableHeaderStart + 10 + propertiesLength + 1;
// dynamic payloadCursor, must be updated after every 'read'
let payloadCursor = 0;
const clientIdLength = getShort(payloadOffset);
packet.client_id = getString(payloadOffset + 2, clientIdLength);
payloadCursor = payloadOffset + 2 + clientIdLength;
// will flag
if (packet.connectFlags[5] === '1') {
packet.will = {};
const cursor = payloadCursor + 1;
const propsLength = data.charCodeAt(cursor);
// todo read properties
const destinationLength = data.charCodeAt(cursor + 1 + propsLength);
packet.will.destination = getString(cursor + 1 + propsLength + 1, destinationLength);
const messageLength = data.charCodeAt(cursor + 1 + propsLength + 1 + destinationLength + 1);
packet.will.message = getString(cursor + 1 + propsLength + 1 + destinationLength + 1 + 1, messageLength);
payloadCursor = cursor + 1 + propsLength + 1 + destinationLength + 1 + messageLength + 1;
}
// username flag
if (packet.connectFlags[0] === '1') {
const usrOffset = payloadCursor + 1;
const usrLength = data.charCodeAt(usrOffset);
packet.username = data.substring(usrOffset + 1, usrOffset + 1 + usrLength);
payloadCursor = usrOffset + 1 + usrLength;
}
// password flag
if (packet.connectFlags[1] === '1') {
const pwdOffset = payloadCursor + 1;
const pwdLength = data.charCodeAt(pwdOffset);
packet.password = data.substring(pwdOffset + 1, pwdOffset + 1 + pwdLength);
}
}
return packet;
}
let client_messages = 1;
function verifyAuth(s) {
s.on('upload', function (data, flags) {
if (data.length === 0) { // Initial calls may contain no data, so
s.log("No buffer yet"); // ask that we get called again
//s.done(1); // (supposing that code=1 means that)
return;
} else if (client_messages === 1) { // Connect is first packet from the client
const packet = decodeConnectPacket(data, s);
s.log("Decoded packet: " + JSON.stringify(packet));
if (packet.username === 'MyUser' && packet.password === 'MyPassword') {
s.log("Authorized");
} else {
s.log("Invalid username or password");
s.deny(); // Close the TCP connection (logged as 500)
return;
}
}
client_messages++;
s.allow();
});
}
export default {verifyAuth}
user nginx;
worker_processes auto;
# js does not work without these modules
load_module modules/ngx_http_js_module.so;
load_module modules/ngx_stream_js_module.so;
error_log /var/log/nginx/error.log;
pid /run/nginx.pid;
events {
worker_connections 1024;
}
stream {
log_format mqtt '$remote_addr [$time_local] $protocol $status $bytes_received '
'$bytes_sent $upstream_addr';
# load file
js_import conf.d/mqtt_client_auth.js;
server {
listen 1883 ssl; # the port for MQTT connections
preread_buffer_size 1k;
js_preread mqtt_client_auth.verifyAuth;
ssl_certificate /etc/nginx/certs/mqtt_proxy.crt;
ssl_certificate_key /etc/nginx/certs/mqtt_proxy.key;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_session_cache shared:SSL:128m; # 128MB ~= 500k sessions
ssl_session_tickets on;
ssl_session_timeout 8h;
proxy_pass mqtt_backend;
proxy_connect_timeout 10s;
access_log /var/log/nginx/mqtt_access.log mqtt;
error_log /var/log/nginx/mqtt_error.log info;
}
upstream mqtt_backend {
server localhost:1883; # the address of the MQTT broker
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment