Skip to content

Instantly share code, notes, and snippets.

@triktron
Last active May 16, 2022 09:51
Show Gist options
  • Save triktron/2b3664e7fc0b199851f08db81912bbcb to your computer and use it in GitHub Desktop.
Save triktron/2b3664e7fc0b199851f08db81912bbcb to your computer and use it in GitHub Desktop.
M2MQTT event based wrapper for Unity
/// Copyright(C) 2022 Triktron, Luca Vanesche <<[email protected]>>
///
/// This program is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// This program is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;
using UnityEngine;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
namespace M2MQTTWrapper
{
public static class MQTTTopic
{
public static bool Matches(string topic, string match)
{
string[] topicParts = topic.Split("/");
string[] matchParts = match.Split("/");
for (int i = 0; i < matchParts.Length; i++)
{
if (i >= topicParts.Length) return false;
if (matchParts[i] == "#") return true;
if (matchParts[i] != "+" && matchParts[i] != topicParts[i]) return false;
}
if (matchParts.Length < topicParts.Length) return false;
return true;
}
public static bool MatchesFast(string topic, string match)
{
return string.CompareOrdinal(topic, match) == 0;
}
}
public class MessageHandler
{
public delegate void OnMessageEvent(string tag, object msg);
public enum MessageType
{
String,
Float,
Int,
Custom
}
private struct DelayedMessage
{
public string Topic;
public object Value;
}
public string Topic;
public OnMessageEvent OnRecieved;
private bool _IsSubscribed;
private bool _FastTest;
private bool _WaitForMainLoop;
private MessageType _Type;
private Type _ClassType;
private Queue<DelayedMessage> _DelayedMessages = new Queue<DelayedMessage>();
public MessageHandler(string topic, bool waitForMainLoop)
{
Topic = topic;
_FastTest = !(topic.Contains("+") || topic.Contains("#"));
_WaitForMainLoop = waitForMainLoop;
}
public MessageHandler(string topic, MessageType type, bool waitForMainLoop) : this(topic, waitForMainLoop)
{
_Type = type;
}
public MessageHandler(string topic, Type type, bool waitForMainLoop) : this(topic, waitForMainLoop)
{
_Type = MessageType.Custom;
_ClassType = type;
}
public void Subscribe(MqttClient client)
{
if (!_IsSubscribed)
{
client.Subscribe(new string[] { Topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
client.MqttMsgPublishReceived += MessageRecieved;
_IsSubscribed = true;
}
}
public void UnSubscribe(MqttClient client)
{
if (_IsSubscribed)
{
client.MqttMsgPublishReceived -= MessageRecieved;
client.Unsubscribe(new string[] { Topic });
_IsSubscribed = false;
}
}
public void MessageRecieved(object sender, MqttMsgPublishEventArgs e)
{
if ((_FastTest && MQTTTopic.MatchesFast(e.Topic, Topic)) ||
(!_FastTest && MQTTTopic.Matches(e.Topic, Topic)))
{
object value = ParseMessage(Encoding.UTF8.GetString(e.Message));
if (_WaitForMainLoop)
{
_DelayedMessages.Enqueue(new DelayedMessage()
{
Topic = e.Topic,
Value = value
});
}
else
{
OnRecieved.Invoke(e.Topic, value);
}
}
}
public void DequeueDelayedMessages()
{
while (_DelayedMessages.Count > 0)
{
var delayedMessage = _DelayedMessages.Dequeue();
OnRecieved.Invoke(delayedMessage.Topic, delayedMessage.Value);
}
}
public object ParseMessage(string msg) => _Type switch
{
MessageType.String => msg,
MessageType.Int => int.TryParse(msg, out int intNumber) ? intNumber : null,
MessageType.Float => float.TryParse(msg, out float floatNumber) ? floatNumber : null,
MessageType.Custom => JsonConvert.DeserializeObject(msg, _ClassType),
_ => null
};
}
public class MQTTInstance : MonoBehaviour
{
#region Properties
[Header("MQTT broker configuration")]
[Tooltip("IP address or URL of the host running the broker")]
public string BrokerAddress = "localhost";
[Tooltip("Port where the broker accepts connections")]
public int BrokerPort = 1883;
[Tooltip("Connect on startup")]
public bool AutoConnect = false;
[Tooltip("UserName for the MQTT broker. Keep blank if no user name is required.")]
public string MqttUserName = null;
[Tooltip("Password for the MQTT broker. Keep blank if no password is required.")]
public string MqttPassword = null;
#endregion Properties
#region Globals
private MqttClient _Client;
private List<MessageHandler> _Handlers = new List<MessageHandler>();
public bool Connected => _Client != null && _Client.IsConnected;
#endregion Globals
#region Events
public delegate void OnConnectedEvent(MQTTInstance instance);
public delegate void OnDisconnectedEvent(MQTTInstance instance);
public delegate void OnMessageEvent(string tag, string msg);
public OnConnectedEvent OnConnected;
public OnDisconnectedEvent OnDisconnect;
public OnMessageEvent OnMessage;
#endregion Events
#region Unity Callbacks
private void Start()
{
if (AutoConnect) Connect();
}
private void Update()
{
foreach (var handler in _Handlers)
{
handler.DequeueDelayedMessages();
}
}
private void OnDestroy()
{
if (Connected) _Client.Disconnect();
}
#endregion Unity Callbacks
#region Connection
public void Connect()
{
if (Connected) _Client.Disconnect();
try
{
_Client = new MqttClient(BrokerAddress, BrokerPort, false, null, null, MqttSslProtocols.None);
_Client.ConnectionClosed += (object sender, EventArgs e) => OnDisconnect?.Invoke(this);
}
catch (Exception e)
{
_Client = null;
Debug.LogErrorFormat("CONNECTION FAILED! {0}", e.ToString());
return;
}
string clientId = Guid.NewGuid().ToString();
_Client.Connect(clientId, MqttUserName, MqttPassword);
foreach (var handler in _Handlers)
{
handler.Subscribe(_Client);
}
OnConnected?.Invoke(this);
_Client.MqttMsgPublishReceived += OnMessageRecieved;
}
public void Disconnect()
{
if (Connected) _Client.Disconnect();
}
private void OnMessageRecieved(object sender, MqttMsgPublishEventArgs msg)
{
OnMessage?.Invoke(msg.Topic, Encoding.UTF8.GetString(msg.Message));
}
public void Send(string topic, string msg)
{
if (Connected) _Client.Publish(topic, Encoding.UTF8.GetBytes(msg), MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false);
}
public void Send(string topic, object msg)
{
if (Connected) _Client.Publish(topic, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg)), MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false);
}
#endregion Connection
#region EventHandlers
public MessageHandler RegisterEvent(string topic, MessageHandler.OnMessageEvent onRecieved, MessageHandler.MessageType messageType = MessageHandler.MessageType.String, bool waitForMainLoop = false)
{
int handlerIndex = _Handlers.FindIndex(h => h.Topic == topic);
if (handlerIndex == -1)
{
handlerIndex = _Handlers.Count;
_Handlers.Add(new MessageHandler(topic, messageType, waitForMainLoop));
if (Connected) _Handlers[handlerIndex].Subscribe(_Client);
}
_Handlers[handlerIndex].OnRecieved += onRecieved;
return _Handlers[handlerIndex];
}
public MessageHandler RegisterEvent(string topic, MessageHandler.OnMessageEvent onRecieved, Type classType, bool waitForMainLoop = false)
{
int handlerIndex = _Handlers.FindIndex(h => h.Topic == topic);
if (handlerIndex == -1)
{
handlerIndex = _Handlers.Count;
_Handlers.Add(new MessageHandler(topic, classType, waitForMainLoop));
if (Connected) _Handlers[handlerIndex].Subscribe(_Client);
}
_Handlers[handlerIndex].OnRecieved += onRecieved;
return _Handlers[handlerIndex];
}
public void UnRegisterEvent(string topic, MessageHandler.OnMessageEvent onRecieved)
{
int handlerIndex = _Handlers.FindIndex(h => h.Topic == topic);
if (handlerIndex == -1) return;
_Handlers[handlerIndex].OnRecieved -= onRecieved;
if (_Handlers[handlerIndex].OnRecieved.GetInvocationList().Length == 0)
{
if (Connected) _Handlers[handlerIndex].UnSubscribe(_Client);
_Handlers.RemoveAt(handlerIndex);
}
}
#endregion EventHandlers
}
}
using M2MQTTWrapper;
using UnityEngine;
public class MQTTTest : MonoBehaviour
{
public MQTTInstance MQTT;
public struct StateStruct
{
public string Name;
public string Value;
}
private void Awake()
{
MQTT.RegisterEvent("test", OnTestRecieved);
MQTT.RegisterEvent("test/#", OnTestRecieved);
MQTT.RegisterEvent("speaker/volume", OnVolume, MessageHandler.MessageType.Int);
MQTT.RegisterEvent("speaker/state", OnState, typeof(StateStruct));
MQTT.OnConnected += OnConnected;
MQTT.OnDisconnect += OnDisconect;
}
private void OnTestRecieved(string tag, object msg)
{
Debug.Log($"Tag: '{tag}' msg: '{msg as string}'");
}
private void OnState(string tag, object rawState)
{
StateStruct state = (StateStruct)rawState;
Debug.Log($"State: {state.Name} {state.Value}");
}
private void OnVolume(string tag, object msg)
{
Debug.Log($"Volume: {msg as int?}");
}
private void OnDisconect(MQTTInstance instance)
{
Debug.Log("MQTT Disconnected");
}
private void OnConnected(MQTTInstance instance)
{
Debug.Log("MQTT Connected");
}
private void OnGUI()
{
if (GUI.Button(new Rect(10, 10, 200, 30), "Send Hello World")) MQTT.Send("hello", "World");
if (GUI.Button(new Rect(10, 40, 200, 30), "Send class")) MQTT.Send("hello", new StateStruct()
{
Name = "State Name",
Value = "State Value"
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment