Last active
May 16, 2022 09:51
-
-
Save triktron/2b3664e7fc0b199851f08db81912bbcb to your computer and use it in GitHub Desktop.
M2MQTT event based wrapper for Unity
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Copyright(C) 2022 Triktron, Luca Vanesche <<[email protected]>> | |
/// | |
/// This program is free software: you can redistribute it and/or modify | |
/// it under the terms of the GNU General Public License as published by | |
/// the Free Software Foundation, either version 3 of the License, or | |
/// (at your option) any later version. | |
/// | |
/// This program is distributed in the hope that it will be useful, | |
/// but WITHOUT ANY WARRANTY; without even the implied warranty of | |
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
/// GNU General Public License for more details. | |
using Newtonsoft.Json; | |
using System; | |
using System.Collections.Generic; | |
using System.Text; | |
using UnityEngine; | |
using uPLibrary.Networking.M2Mqtt; | |
using uPLibrary.Networking.M2Mqtt.Messages; | |
namespace M2MQTTWrapper | |
{ | |
public static class MQTTTopic | |
{ | |
public static bool Matches(string topic, string match) | |
{ | |
string[] topicParts = topic.Split("/"); | |
string[] matchParts = match.Split("/"); | |
for (int i = 0; i < matchParts.Length; i++) | |
{ | |
if (i >= topicParts.Length) return false; | |
if (matchParts[i] == "#") return true; | |
if (matchParts[i] != "+" && matchParts[i] != topicParts[i]) return false; | |
} | |
if (matchParts.Length < topicParts.Length) return false; | |
return true; | |
} | |
public static bool MatchesFast(string topic, string match) | |
{ | |
return string.CompareOrdinal(topic, match) == 0; | |
} | |
} | |
public class MessageHandler | |
{ | |
public delegate void OnMessageEvent(string tag, object msg); | |
public enum MessageType | |
{ | |
String, | |
Float, | |
Int, | |
Custom | |
} | |
private struct DelayedMessage | |
{ | |
public string Topic; | |
public object Value; | |
} | |
public string Topic; | |
public OnMessageEvent OnRecieved; | |
private bool _IsSubscribed; | |
private bool _FastTest; | |
private bool _WaitForMainLoop; | |
private MessageType _Type; | |
private Type _ClassType; | |
private Queue<DelayedMessage> _DelayedMessages = new Queue<DelayedMessage>(); | |
public MessageHandler(string topic, bool waitForMainLoop) | |
{ | |
Topic = topic; | |
_FastTest = !(topic.Contains("+") || topic.Contains("#")); | |
_WaitForMainLoop = waitForMainLoop; | |
} | |
public MessageHandler(string topic, MessageType type, bool waitForMainLoop) : this(topic, waitForMainLoop) | |
{ | |
_Type = type; | |
} | |
public MessageHandler(string topic, Type type, bool waitForMainLoop) : this(topic, waitForMainLoop) | |
{ | |
_Type = MessageType.Custom; | |
_ClassType = type; | |
} | |
public void Subscribe(MqttClient client) | |
{ | |
if (!_IsSubscribed) | |
{ | |
client.Subscribe(new string[] { Topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE }); | |
client.MqttMsgPublishReceived += MessageRecieved; | |
_IsSubscribed = true; | |
} | |
} | |
public void UnSubscribe(MqttClient client) | |
{ | |
if (_IsSubscribed) | |
{ | |
client.MqttMsgPublishReceived -= MessageRecieved; | |
client.Unsubscribe(new string[] { Topic }); | |
_IsSubscribed = false; | |
} | |
} | |
public void MessageRecieved(object sender, MqttMsgPublishEventArgs e) | |
{ | |
if ((_FastTest && MQTTTopic.MatchesFast(e.Topic, Topic)) || | |
(!_FastTest && MQTTTopic.Matches(e.Topic, Topic))) | |
{ | |
object value = ParseMessage(Encoding.UTF8.GetString(e.Message)); | |
if (_WaitForMainLoop) | |
{ | |
_DelayedMessages.Enqueue(new DelayedMessage() | |
{ | |
Topic = e.Topic, | |
Value = value | |
}); | |
} | |
else | |
{ | |
OnRecieved.Invoke(e.Topic, value); | |
} | |
} | |
} | |
public void DequeueDelayedMessages() | |
{ | |
while (_DelayedMessages.Count > 0) | |
{ | |
var delayedMessage = _DelayedMessages.Dequeue(); | |
OnRecieved.Invoke(delayedMessage.Topic, delayedMessage.Value); | |
} | |
} | |
public object ParseMessage(string msg) => _Type switch | |
{ | |
MessageType.String => msg, | |
MessageType.Int => int.TryParse(msg, out int intNumber) ? intNumber : null, | |
MessageType.Float => float.TryParse(msg, out float floatNumber) ? floatNumber : null, | |
MessageType.Custom => JsonConvert.DeserializeObject(msg, _ClassType), | |
_ => null | |
}; | |
} | |
public class MQTTInstance : MonoBehaviour | |
{ | |
#region Properties | |
[Header("MQTT broker configuration")] | |
[Tooltip("IP address or URL of the host running the broker")] | |
public string BrokerAddress = "localhost"; | |
[Tooltip("Port where the broker accepts connections")] | |
public int BrokerPort = 1883; | |
[Tooltip("Connect on startup")] | |
public bool AutoConnect = false; | |
[Tooltip("UserName for the MQTT broker. Keep blank if no user name is required.")] | |
public string MqttUserName = null; | |
[Tooltip("Password for the MQTT broker. Keep blank if no password is required.")] | |
public string MqttPassword = null; | |
#endregion Properties | |
#region Globals | |
private MqttClient _Client; | |
private List<MessageHandler> _Handlers = new List<MessageHandler>(); | |
public bool Connected => _Client != null && _Client.IsConnected; | |
#endregion Globals | |
#region Events | |
public delegate void OnConnectedEvent(MQTTInstance instance); | |
public delegate void OnDisconnectedEvent(MQTTInstance instance); | |
public delegate void OnMessageEvent(string tag, string msg); | |
public OnConnectedEvent OnConnected; | |
public OnDisconnectedEvent OnDisconnect; | |
public OnMessageEvent OnMessage; | |
#endregion Events | |
#region Unity Callbacks | |
private void Start() | |
{ | |
if (AutoConnect) Connect(); | |
} | |
private void Update() | |
{ | |
foreach (var handler in _Handlers) | |
{ | |
handler.DequeueDelayedMessages(); | |
} | |
} | |
private void OnDestroy() | |
{ | |
if (Connected) _Client.Disconnect(); | |
} | |
#endregion Unity Callbacks | |
#region Connection | |
public void Connect() | |
{ | |
if (Connected) _Client.Disconnect(); | |
try | |
{ | |
_Client = new MqttClient(BrokerAddress, BrokerPort, false, null, null, MqttSslProtocols.None); | |
_Client.ConnectionClosed += (object sender, EventArgs e) => OnDisconnect?.Invoke(this); | |
} | |
catch (Exception e) | |
{ | |
_Client = null; | |
Debug.LogErrorFormat("CONNECTION FAILED! {0}", e.ToString()); | |
return; | |
} | |
string clientId = Guid.NewGuid().ToString(); | |
_Client.Connect(clientId, MqttUserName, MqttPassword); | |
foreach (var handler in _Handlers) | |
{ | |
handler.Subscribe(_Client); | |
} | |
OnConnected?.Invoke(this); | |
_Client.MqttMsgPublishReceived += OnMessageRecieved; | |
} | |
public void Disconnect() | |
{ | |
if (Connected) _Client.Disconnect(); | |
} | |
private void OnMessageRecieved(object sender, MqttMsgPublishEventArgs msg) | |
{ | |
OnMessage?.Invoke(msg.Topic, Encoding.UTF8.GetString(msg.Message)); | |
} | |
public void Send(string topic, string msg) | |
{ | |
if (Connected) _Client.Publish(topic, Encoding.UTF8.GetBytes(msg), MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false); | |
} | |
public void Send(string topic, object msg) | |
{ | |
if (Connected) _Client.Publish(topic, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg)), MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false); | |
} | |
#endregion Connection | |
#region EventHandlers | |
public MessageHandler RegisterEvent(string topic, MessageHandler.OnMessageEvent onRecieved, MessageHandler.MessageType messageType = MessageHandler.MessageType.String, bool waitForMainLoop = false) | |
{ | |
int handlerIndex = _Handlers.FindIndex(h => h.Topic == topic); | |
if (handlerIndex == -1) | |
{ | |
handlerIndex = _Handlers.Count; | |
_Handlers.Add(new MessageHandler(topic, messageType, waitForMainLoop)); | |
if (Connected) _Handlers[handlerIndex].Subscribe(_Client); | |
} | |
_Handlers[handlerIndex].OnRecieved += onRecieved; | |
return _Handlers[handlerIndex]; | |
} | |
public MessageHandler RegisterEvent(string topic, MessageHandler.OnMessageEvent onRecieved, Type classType, bool waitForMainLoop = false) | |
{ | |
int handlerIndex = _Handlers.FindIndex(h => h.Topic == topic); | |
if (handlerIndex == -1) | |
{ | |
handlerIndex = _Handlers.Count; | |
_Handlers.Add(new MessageHandler(topic, classType, waitForMainLoop)); | |
if (Connected) _Handlers[handlerIndex].Subscribe(_Client); | |
} | |
_Handlers[handlerIndex].OnRecieved += onRecieved; | |
return _Handlers[handlerIndex]; | |
} | |
public void UnRegisterEvent(string topic, MessageHandler.OnMessageEvent onRecieved) | |
{ | |
int handlerIndex = _Handlers.FindIndex(h => h.Topic == topic); | |
if (handlerIndex == -1) return; | |
_Handlers[handlerIndex].OnRecieved -= onRecieved; | |
if (_Handlers[handlerIndex].OnRecieved.GetInvocationList().Length == 0) | |
{ | |
if (Connected) _Handlers[handlerIndex].UnSubscribe(_Client); | |
_Handlers.RemoveAt(handlerIndex); | |
} | |
} | |
#endregion EventHandlers | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using M2MQTTWrapper; | |
using UnityEngine; | |
public class MQTTTest : MonoBehaviour | |
{ | |
public MQTTInstance MQTT; | |
public struct StateStruct | |
{ | |
public string Name; | |
public string Value; | |
} | |
private void Awake() | |
{ | |
MQTT.RegisterEvent("test", OnTestRecieved); | |
MQTT.RegisterEvent("test/#", OnTestRecieved); | |
MQTT.RegisterEvent("speaker/volume", OnVolume, MessageHandler.MessageType.Int); | |
MQTT.RegisterEvent("speaker/state", OnState, typeof(StateStruct)); | |
MQTT.OnConnected += OnConnected; | |
MQTT.OnDisconnect += OnDisconect; | |
} | |
private void OnTestRecieved(string tag, object msg) | |
{ | |
Debug.Log($"Tag: '{tag}' msg: '{msg as string}'"); | |
} | |
private void OnState(string tag, object rawState) | |
{ | |
StateStruct state = (StateStruct)rawState; | |
Debug.Log($"State: {state.Name} {state.Value}"); | |
} | |
private void OnVolume(string tag, object msg) | |
{ | |
Debug.Log($"Volume: {msg as int?}"); | |
} | |
private void OnDisconect(MQTTInstance instance) | |
{ | |
Debug.Log("MQTT Disconnected"); | |
} | |
private void OnConnected(MQTTInstance instance) | |
{ | |
Debug.Log("MQTT Connected"); | |
} | |
private void OnGUI() | |
{ | |
if (GUI.Button(new Rect(10, 10, 200, 30), "Send Hello World")) MQTT.Send("hello", "World"); | |
if (GUI.Button(new Rect(10, 40, 200, 30), "Send class")) MQTT.Send("hello", new StateStruct() | |
{ | |
Name = "State Name", | |
Value = "State Value" | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment