Skip to content

Instantly share code, notes, and snippets.

@ekwus
Created May 20, 2014 08:48
Show Gist options
  • Save ekwus/83082e40d1c7189aced3 to your computer and use it in GitHub Desktop.
Save ekwus/83082e40d1c7189aced3 to your computer and use it in GitHub Desktop.
XSockets Binary Messaging Client-Server-Client
using Smots.Common.Database;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using XSockets.Core.XSocket;
using XSockets.Core.XSocket.Helpers;
using Smots.Interface.Model;
using System.IO;
using Smots.Interface;
using Smots.Recorder.Core;
namespace Smots.Recorder.XControllers
{
public class Playback : XSocketController
{
public override void OnMessage(XSockets.Core.Common.Socket.Event.Interface.IBinaryArgs binaryArgs)
{
var bm = binaryArgs.AsCombinedMessage();
var meta = bm.Extract<PlayMeta>();
var media = bm.BinaryArgs.data.ToArray();
if ([email protected]("media"))
{
//this.Send(binaryArgs);
this.SendToAll(binaryArgs);
//this.SendToOthers(binaryArgs);
}
else
{
base.OnMessage(binaryArgs);
}
}
public void GetPlaybacks()
{
List<PlaybackInfo> playbacks = GetPlaybackInfos();
this.SendTo(c => c.ClientGuid == this.ClientGuid, playbacks, SmotsConst.Methods.GetPlaybacks);
}
private List<PlaybackInfo> GetPlaybackInfos()
{
List<PlaybackInfo> playbacks = new List<PlaybackInfo>();
using (SmotsNvrContext db = new SmotsNvrContext())
{
var locs = db.Locations;
foreach (RecordLocation loc in locs)
{
string[] files = Directory.GetDirectories(loc.Path);
foreach (string file in files)
{
string[] playFiles = Directory.GetFiles(file, "*", SearchOption.AllDirectories);
string id;
DateTime start;
RecFile.ParseFilePath(playFiles.FirstOrDefault(), out id, out start);
DateTime end; // NOTE: This end date/time should parse the fiel to find the real end
RecFile.ParseFilePath(playFiles.LastOrDefault(), out id, out end);
end.AddMinutes(1.0);
PlaybackInfo info = new PlaybackInfo
{
Key = id,
Root = loc.Path,
Start = start,
End = end
};
playbacks.Add(info);
}
}
//this.SendTo(c => c.ClientGuid == this.ClientGuid, playbacks, SmotsConst.Events.OnPlaybacks);
}
return playbacks;
}
public string StartPlayback(PlaybackInfo info, DateTime start)
{
return PlaybackServer.Instance.StartSession(info, start);
}
public void StopPlayback(string id)
{
PlaybackServer.Instance.StopSession(id);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using XSockets.Client40;
using XSockets.Client40.Common.Event.Arguments;
using XSockets.Client40.Common.Event.Interface;
namespace Smots.Recorder.Core
{
public static class BinaryMessageExtensions
{
public static BinaryArgs CreateBinaryMessage(this XSocketClient client, IList<byte> buffer, object metadata, string @event)
{
//Transform metadata into JSON
var jsonMeta = client.Serializer.SerializeToString(metadata);
//Add metadata to a TextArgs object
var json = client.Serializer.SerializeToString(new TextArgs(jsonMeta, @event));
//Set the metadata as header in the binary message
var ms = new List<byte>();
var payload = json;
var header = BitConverter.GetBytes((Int64)payload.Length);
ms.AddRange(header);
ms.AddRange(Encoding.UTF8.GetBytes(payload));
ms.AddRange(buffer);
return new BinaryArgs(ms);
}
}
}
private IXSocketServerContainer m_xserver;
// XSockets
m_xserver = XSockets.Plugin.Framework.Composable.GetExport<IXSocketServerContainer>();
m_xserver.OnServerClientConnection += m_xserver_OnServerClientConnection;
m_xserver.OnServerClientDisconnection += m_xserver_OnServerClientDisconnection;
m_xserver.StartServers(true);
foreach (var server in m_xserver.Servers)
{
LOG.DEBUG("XSocket Server {0}", server.ConfigurationSetting.Endpoint);
}
My first working attempt at sending XSocket binary messages from client to client.
It also has my idea of how to make a sync call from the client. I'm sure there are better ways, anyone?
Please feel free to correct me as I'm just learning XSockets as I go.
private XSocketClient m_xcli;
private Dictionary<string, Action<PlayMeta, byte[]>> m_callbacks;
m_xcli = new XSocketClient(m_url, "*");
m_xcli.OnOpen += m_xcli_OnOpen;
m_xcli.OnBlob += m_xcli_OnBlob;
m_xcli.Open();
void m_xcli_OnBlob(object sender, XSockets.Client40.Common.Event.Arguments.BinaryArgs e)
{
try
{
byte[] data = e.data.ToArray();
int length = (int)BitConverter.ToInt64(data, 0);
string json = Encoding.UTF8.GetString(data, sizeof(Int64), length);
TextArgs textArgs = m_xcli.Deserialize<TextArgs>(json);
if ([email protected]("media"))
{
PlayMeta meta = m_xcli.Deserialize<PlayMeta>(textArgs.data);
int mediaLength = data.Length - sizeof(Int64) - (int)length;
byte[] mediaData = new byte[mediaLength];
Array.Copy(data, length + sizeof(Int64), mediaData, 0, mediaLength);
if (m_callbacks.ContainsKey(textArgs.@event))
{
m_callbacks[textArgs.@event](meta, mediaData);
}
}
}
catch
{
}
}
public List<PlaybackInfo> GetPlaybacks()
{
ManualResetEventSlim wait = new ManualResetEventSlim();
List<PlaybackInfo> playbacks = null;
m_xcli.One(SmotsConst.Methods.GetPlaybacks, (args) =>
{
playbacks = m_xcli.Deserialize<List<PlaybackInfo>>(args.data);
wait.Set();
},
(ca) =>
{
m_xcli.Trigger(SmotsConst.Methods.GetPlaybacks);
});
wait.Wait(50000);
return playbacks;
}
public string StartPlayback(PlaybackInfo info, DateTime start, Action<PlayMeta, byte[]> callback)
{
if (!IsConnected)
{
throw new Exception("Need to call Connect first!");
}
ManualResetEventSlim wait = new ManualResetEventSlim();
string id = string.Empty;
var req = new { info = info, start = start };
m_xcli.One(SmotsConst.Methods.StartPlayback, (args) =>
{
id = m_xcli.Deserialize<string>(args.data);
string media = string.Format("media{0}", id);
if (!m_callbacks.ContainsKey(media))
{
m_callbacks.Add(media, callback);
}
wait.Set();
},
(ca) =>
{
m_xcli.Send(req, SmotsConst.Methods.StartPlayback);
});
wait.Wait(5000);
return id;
}
public void StopPlayback(string id)
{
if (!IsConnected)
{
throw new Exception("Need to call Connect first!");
}
string media = string.Format("media{0}", id);
if (m_callbacks.ContainsKey(media))
{
m_callbacks.Remove(media);
}
m_xcli.Send(id, SmotsConst.Methods.StopPlayback);
}
m_xcli = new XSocketClient("ws://172.22.0.17:4502/Playback", "*");
m_xcli.OnOpen += m_xcli_OnOpen;
m_xcli.Open();
private void SendMediaViaServer(byte[] data, PlayMeta meta)
{
try
{
LOG.DEBUG("Timestamp {0}", meta.Timestamp.Ticks);
meta.Session = SessionId;
string mediaId = string.Format("media{0}", SessionId);
var message = m_xcli.CreateBinaryMessage(data, meta, mediaId);
m_xcli.Send(message);
}
catch
{
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment