Skip to content

Instantly share code, notes, and snippets.

@kurapica
Created September 20, 2022 05:41
Show Gist options
  • Save kurapica/af1caad4f23f482f9efadda4e92cfc40 to your computer and use it in GitHub Desktop.
Save kurapica/af1caad4f23f482f9efadda4e92cfc40 to your computer and use it in GitHub Desktop.
A simpe mqtt broker with redis support to show the active clients
pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
include mime.types;
lua_shared_dict plbr_session_storage 10m;
lua_shared_dict ngxlua_file_lock 100k;
lua_package_path "${prefix}?.lua;${prefix}?/init.lua;/usr/local/openresty/lualib/?.lua;/usr/local/share/lua/5.1/?.lua;/usr/local/share/lua/5.1/?/init.lua";
init_by_lua_block{
-- The platform settings is very important to avoid problems
PLOOP_PLATFORM_SETTINGS = { ENABLE_CONTEXT_FEATURES = true, THREAD_SAFE_ITERATOR = true }
require "NgxLua" -- Load the PLoop & NgxLua
import "NgxLua" -- Import it to _G so we can use it directly
import "System"
Application "MqttBroker" (function(_ENV)
namespace "MqttBroker"
export { List, System.Net.MQTT.ProtocolLevel }
-----------------------------------------------------------------------
-- Global Web Configuration
-----------------------------------------------------------------------
Web.Config = {
Debug = false,
LogLevel = System.Logger.LogLevel.Info,
}
class "HttpContext" {
NgxLua.HttpContext,
__ctor = function(self) self.Application = _ENV end
}
__Route__("/")
__View__ "index.view" [[
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>MQTT Client List</title>
</head>
<body>
@for _, client in self.Clients:GetIterator() do
<hr/>
<h1>Client ID: @\client.ClientID</h1>
<p>Connection Time: @client.ConnectionTime:ToString()</p>
<p>Protocol: @client.ProtocolLevel</p>
@if client.UserName then
<p>User Name: @client.UserName</p>
@end
@if client.Password then
<p>Password: @client.Password</p>
@end
@if client.Subscribes and #client.Subscribes > 0 then
<h2>Subscribed Topics:</h2>
<ul>
@for _, topic in client.Subscribes:GetIterator() do
<li>@\topic</li>
@end
</ul>
<br/>
@end
@if client.Topics and next(client.Topics) then
<h2>Publish Messages:</h2>
<table border="0" cellspacing="1" cellpadding="4" bgcolor="#cccccc">
<thead>
<tr>
<th style="background-color:#ffffff;"><b>Topic</b></th>
<th style="background-color:#ffffff;"><b>Message</b></th>
</tr>
</thead>
<tbody>
@for k, v in pairs(client.Topics) do
<tr>
<td style="background-color:#ffffff;">@\k</td>
<td style="background-color:#ffffff;">@\v</td>
</tr>
@end
</tbody>
</table>
@end
@end
</body>
</html>
]]
function home(context)
return with(NgxLua.Redis())(function(redis)
local data = XDictionary( redis:HPairs("MQTT_CLIENT_LIST") )
:Map(function(target, data)
return {
ClientID = target,
ConnectionTime = System.Date(data.time),
UserName = data.user,
Password = data.pass,
ProtocolLevel = ProtocolLevel(data.protocol),
Subscribes = List( redis:SPairs("TOPIC_" .. target) ),
Topics = XDictionary(redis:HPairs(target)):ToTable(),
}
end).Values:ToList()
data:Sort("x,y=>x.ConnectionTime<y.ConnectionTime")
return { Clients = data }
end)
end
end)
-- Save MqttBroker to _G
import "MqttBroker"
}
server{
listen 80;
server_name localhost;
location / {
root html;
# MIME type determined by default_type:
default_type 'text/html';
content_by_lua ' MqttBroker.HttpContext():Process() ';
}
}
}
stream {
lua_package_path "${prefix}?.lua;${prefix}?/init.lua;/usr/local/openresty/lualib/?.lua;/usr/local/share/lua/5.1/?.lua;/usr/local/share/lua/5.1/?/init.lua";
# no socket timeout error
lua_socket_log_errors off;
init_by_lua_block{
-- The platform settings is very important to avoid problems
PLOOP_PLATFORM_SETTINGS = { ENABLE_CONTEXT_FEATURES = true, THREAD_SAFE_ITERATOR = true }
require "NgxLua" -- Load the PLoop & NgxLua
import "NgxLua" -- Import it to _G so we can use it directly
import "System"
}
server {
listen 1883;
content_by_lua_block{
-- Get the server-side client
local client = NgxLua.Net.MQTT.Client{
MessageReceiveTimeout = 3,
ReceiveTimeout = 1,
SendTimeout = 1,
-- use redis message publisher
MessagePublisher = NgxLua.Net.MQTT.RedisMQTTPublisher{ host = "127.0.0.1", port = 6379 },
}
local redis = NgxLua.Redis{ host = "127.0.0.1", port = 6379 }
redis:Open()
function client:OnConnected()
redis:HSet("MQTT_CLIENT_LIST", client.ClientID, { time = System.Date.Now, user = client.UserName, pass = client.Password, protocol = client.ProtocolLevel })
end
function client:OnTopicSubscribed(topic)
redis:SAdd("TOPIC_" .. client.ClientID, topic)
end
function client:OnTopicUnsubscribed(topic)
redis:SRemove("TOPIC_" .. client.ClientID, topic)
end
function client:OnMessageReceived(topic, message)
redis:HSet(client.ClientID, topic, message)
end
-- Process with the client
client:Process()
redis:HDelete("MQTT_CLIENT_LIST", client.ClientID)
redis:Delete("TOPIC_" .. client.ClientID)
redis:Delete(client.ClientID)
redis:Close()
}
}
}
@kurapica
Copy link
Author

Usage:

  1. Install the openresty.
  2. Clone PLoop & NgxLua into /usr/local/openresty/nginx.
  3. Replace the /usr/local/openresty/nginx/nginx.conf with config file

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment