Created
September 20, 2022 05:41
-
-
Save kurapica/af1caad4f23f482f9efadda4e92cfc40 to your computer and use it in GitHub Desktop.
A simpe mqtt broker with redis support to show the active clients
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pid logs/nginx.pid; | |
events { | |
worker_connections 1024; | |
} | |
http { | |
include mime.types; | |
lua_shared_dict plbr_session_storage 10m; | |
lua_shared_dict ngxlua_file_lock 100k; | |
lua_package_path "${prefix}?.lua;${prefix}?/init.lua;/usr/local/openresty/lualib/?.lua;/usr/local/share/lua/5.1/?.lua;/usr/local/share/lua/5.1/?/init.lua"; | |
init_by_lua_block{ | |
-- The platform settings is very important to avoid problems | |
PLOOP_PLATFORM_SETTINGS = { ENABLE_CONTEXT_FEATURES = true, THREAD_SAFE_ITERATOR = true } | |
require "NgxLua" -- Load the PLoop & NgxLua | |
import "NgxLua" -- Import it to _G so we can use it directly | |
import "System" | |
Application "MqttBroker" (function(_ENV) | |
namespace "MqttBroker" | |
export { List, System.Net.MQTT.ProtocolLevel } | |
----------------------------------------------------------------------- | |
-- Global Web Configuration | |
----------------------------------------------------------------------- | |
Web.Config = { | |
Debug = false, | |
LogLevel = System.Logger.LogLevel.Info, | |
} | |
class "HttpContext" { | |
NgxLua.HttpContext, | |
__ctor = function(self) self.Application = _ENV end | |
} | |
__Route__("/") | |
__View__ "index.view" [[ | |
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<title>MQTT Client List</title> | |
</head> | |
<body> | |
@for _, client in self.Clients:GetIterator() do | |
<hr/> | |
<h1>Client ID: @\client.ClientID</h1> | |
<p>Connection Time: @client.ConnectionTime:ToString()</p> | |
<p>Protocol: @client.ProtocolLevel</p> | |
@if client.UserName then | |
<p>User Name: @client.UserName</p> | |
@end | |
@if client.Password then | |
<p>Password: @client.Password</p> | |
@end | |
@if client.Subscribes and #client.Subscribes > 0 then | |
<h2>Subscribed Topics:</h2> | |
<ul> | |
@for _, topic in client.Subscribes:GetIterator() do | |
<li>@\topic</li> | |
@end | |
</ul> | |
<br/> | |
@end | |
@if client.Topics and next(client.Topics) then | |
<h2>Publish Messages:</h2> | |
<table border="0" cellspacing="1" cellpadding="4" bgcolor="#cccccc"> | |
<thead> | |
<tr> | |
<th style="background-color:#ffffff;"><b>Topic</b></th> | |
<th style="background-color:#ffffff;"><b>Message</b></th> | |
</tr> | |
</thead> | |
<tbody> | |
@for k, v in pairs(client.Topics) do | |
<tr> | |
<td style="background-color:#ffffff;">@\k</td> | |
<td style="background-color:#ffffff;">@\v</td> | |
</tr> | |
@end | |
</tbody> | |
</table> | |
@end | |
@end | |
</body> | |
</html> | |
]] | |
function home(context) | |
return with(NgxLua.Redis())(function(redis) | |
local data = XDictionary( redis:HPairs("MQTT_CLIENT_LIST") ) | |
:Map(function(target, data) | |
return { | |
ClientID = target, | |
ConnectionTime = System.Date(data.time), | |
UserName = data.user, | |
Password = data.pass, | |
ProtocolLevel = ProtocolLevel(data.protocol), | |
Subscribes = List( redis:SPairs("TOPIC_" .. target) ), | |
Topics = XDictionary(redis:HPairs(target)):ToTable(), | |
} | |
end).Values:ToList() | |
data:Sort("x,y=>x.ConnectionTime<y.ConnectionTime") | |
return { Clients = data } | |
end) | |
end | |
end) | |
-- Save MqttBroker to _G | |
import "MqttBroker" | |
} | |
server{ | |
listen 80; | |
server_name localhost; | |
location / { | |
root html; | |
# MIME type determined by default_type: | |
default_type 'text/html'; | |
content_by_lua ' MqttBroker.HttpContext():Process() '; | |
} | |
} | |
} | |
stream { | |
lua_package_path "${prefix}?.lua;${prefix}?/init.lua;/usr/local/openresty/lualib/?.lua;/usr/local/share/lua/5.1/?.lua;/usr/local/share/lua/5.1/?/init.lua"; | |
# no socket timeout error | |
lua_socket_log_errors off; | |
init_by_lua_block{ | |
-- The platform settings is very important to avoid problems | |
PLOOP_PLATFORM_SETTINGS = { ENABLE_CONTEXT_FEATURES = true, THREAD_SAFE_ITERATOR = true } | |
require "NgxLua" -- Load the PLoop & NgxLua | |
import "NgxLua" -- Import it to _G so we can use it directly | |
import "System" | |
} | |
server { | |
listen 1883; | |
content_by_lua_block{ | |
-- Get the server-side client | |
local client = NgxLua.Net.MQTT.Client{ | |
MessageReceiveTimeout = 3, | |
ReceiveTimeout = 1, | |
SendTimeout = 1, | |
-- use redis message publisher | |
MessagePublisher = NgxLua.Net.MQTT.RedisMQTTPublisher{ host = "127.0.0.1", port = 6379 }, | |
} | |
local redis = NgxLua.Redis{ host = "127.0.0.1", port = 6379 } | |
redis:Open() | |
function client:OnConnected() | |
redis:HSet("MQTT_CLIENT_LIST", client.ClientID, { time = System.Date.Now, user = client.UserName, pass = client.Password, protocol = client.ProtocolLevel }) | |
end | |
function client:OnTopicSubscribed(topic) | |
redis:SAdd("TOPIC_" .. client.ClientID, topic) | |
end | |
function client:OnTopicUnsubscribed(topic) | |
redis:SRemove("TOPIC_" .. client.ClientID, topic) | |
end | |
function client:OnMessageReceived(topic, message) | |
redis:HSet(client.ClientID, topic, message) | |
end | |
-- Process with the client | |
client:Process() | |
redis:HDelete("MQTT_CLIENT_LIST", client.ClientID) | |
redis:Delete("TOPIC_" .. client.ClientID) | |
redis:Delete(client.ClientID) | |
redis:Close() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage: