Last active
August 29, 2015 14:21
-
-
Save rdavisau/e7aa53749e08e13329c6 to your computer and use it in GitHub Desktop.
LINQPad script demonstrating how to use service discovery and the JsonProtocolMessenger
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Query Kind="Program"> | |
<NuGetReference>Newtonsoft.Json</NuGetReference> | |
<NuGetReference Prerelease="true">rda.SocketHelpers</NuGetReference> | |
<Namespace>SocketHelpers.Discovery</Namespace> | |
<Namespace>SocketHelpers.Messaging</Namespace> | |
<Namespace>Sockets.Plugin</Namespace> | |
<Namespace>Splat</Namespace> | |
<Namespace>System.Reactive.Linq</Namespace> | |
<Namespace>System.Threading.Tasks</Namespace> | |
</Query> | |
async Task Main() | |
{ | |
// hook into logging in case things Go Wrong which is not entirely unlikely | |
Locator.CurrentMutable.RegisterConstant(new LINQPadLogger(), typeof(ILogger)); | |
var server = new MyServer(); | |
var client = new MyClient(); | |
await server.StartHosting(50023); | |
await client.DoItAll(); | |
} | |
#region Server and Client | |
public class MyServer | |
{ | |
private TcpSocketListener _listener; | |
private ServicePublisher<TypedServiceDefinition<string,MyServiceReponsePayload>> _publisher; // this verboseness will be fixed | |
public async Task StartHosting(int port) | |
{ | |
// first we actually start listening, then we can start 'publishing' availability | |
await StartSocketListener(port); | |
await StartPublishingService(port); | |
} | |
public async Task StopHosting() | |
{ | |
// stop publishing, then unbind the listener | |
await _listener.StopListeningAsync(); | |
await _publisher.Unpublish(); | |
} | |
private Task StartSocketListener(int port) | |
{ | |
// set up a tcpsocketlistener for new connections | |
// when we get one, attach a jsonprotocolmessenger | |
// and dump the message stream | |
_listener = new TcpSocketListener(); | |
// set up the connection event handler | |
_listener.ConnectionReceived += (sender, args) => | |
{ | |
var client = args.SocketClient; | |
// we got a new connection, held in args.SocketClient | |
// we wrap it in a JsonProtocolMessenger | |
// Message is our 'base' message class, it could be 'object' if we wanted | |
// Adding your running assembly to AdditionalTypeResolutionAssemblies seems to be required for any message resolution | |
// will look into that.. | |
var messenger = new JsonProtocolMessenger<Message>(client) | |
{ | |
AdditionalTypeResolutionAssemblies = { typeof(Message).Assembly } | |
}; | |
// here just showing how you can filter your messages | |
Util.HorizontalRun("Message,MessageSubclass", | |
messenger.Messages.Where(m=> m.GetType() == typeof(Message)), | |
messenger.Messages.OfType<MessageSubclass>()) | |
.Dump(String.Format("Messages received from the client at {0}:{1}", client.RemoteAddress, client.RemotePort)); | |
// might as well log when clients are disconnected | |
messenger.Disconnected += (dsender, dargs) => dargs.Dump(String.Format("{0}:{1} Disconnected {2}", client.RemoteAddress, client.RemotePort)); | |
// nothing happens unless you call StartExecuting | |
messenger.StartExecuting(); | |
}; | |
return _listener.StartListeningAsync(port); | |
} | |
private Task StartPublishingService(int port) | |
{ | |
// create an instance of MyServiceDefinition | |
// then generate a publisher | |
var serviceDef = new MyServiceDefinition(port); | |
_publisher = serviceDef.CreateServicePublisher(); | |
// start 'publishing' | |
// (actually it's listening for discovery reqs) | |
return _publisher.Publish(); | |
} | |
} | |
public class MyClient | |
{ | |
private TcpSocketClient _client; | |
private JsonProtocolMessenger<Message> _messenger; | |
public async Task DoItAll() | |
{ | |
// first discover | |
var connectionDetails = await Discover(); | |
var host = connectionDetails.RemoteAddress; | |
var port = connectionDetails.ServicePort; | |
// then connect | |
await Connect(host, port); | |
// then send a few bits and pieces before disconnecting | |
var r = new Random(); | |
var names = new [] { "alice", "bob", "charlie", "danielle", "ester", "fabian", "geoffery" }; | |
Observable | |
.Interval(TimeSpan.FromSeconds(1)) | |
.Take(5) | |
.Subscribe(onNext: _=> _messenger.Send(new Message(names.RandomItem())), | |
onCompleted: async () => | |
{ | |
// send a MessageSubclass, then disconnect | |
_messenger.Send(new MessageSubclass(names.RandomItem(), Enumerable.Range(0,4).Select(_=> r.Next(0,100)).ToList(), new MyServiceDefinition(port))); | |
await Disconnect(); | |
}); | |
} | |
public async Task<MyServiceReponsePayload> Discover() | |
{ | |
// create instance of service definition | |
// generate a discoverer from it | |
var serviceDef = new MyServiceDefinition(); | |
var discoverer = serviceDef.CreateServiceDiscoverer(); | |
discoverer.SendOnAllInterfaces = true; | |
// probably don't actually do this bit like this | |
// i'm taking shortcuts | |
// send out a discovery request every 500ms | |
var canceller = new CancellationTokenSource(); | |
Task.Run(async ()=> { | |
while (!canceller.IsCancellationRequested) | |
{ | |
await discoverer.Discover(); | |
await Task.Delay(TimeSpan.FromSeconds(.5)); | |
} | |
}, canceller.Token); | |
// DiscoveredServices pumps out the service responses | |
// in this demo case we just want the first one and we'll | |
// try to connect to it | |
// | |
// because i took shortcuts we might miss the first response | |
// given we start watching /after/ setting up the discovery loop | |
// (but we'll get the next one in 500ms) | |
return | |
await discoverer | |
.DiscoveredServices | |
.Take(1) | |
.Do(_=> canceller.Cancel()); // ha ha sneaky side effects | |
} | |
private async Task Connect(string host, int port) | |
{ | |
// first connect the TcpSocketClient | |
_client = new TcpSocketClient(); | |
await _client.ConnectAsync(host, port); | |
// then attach the JsonProtocolMessenger | |
_messenger = new JsonProtocolMessenger<Message>(_client) | |
{ | |
AdditionalTypeResolutionAssemblies = { typeof(Message).Assembly } | |
}; | |
// gotta call start executing | |
_messenger.StartExecuting(); | |
} | |
private Task Disconnect() | |
{ | |
// you don't have to call StopExecuting if you disconnect | |
// feels a bit lop-sided, api wise | |
return _messenger.Disconnect(DisconnectionType.Graceful); | |
} | |
} | |
#endregion | |
#region Service Definition | |
public class MyServiceReponsePayload : IDiscoveryPayload | |
{ | |
public string RemoteAddress { get; set; } | |
public int ServicePort { get; set; } | |
// implement explicitly because we don't need this here | |
int IDiscoveryPayload.RemotePort { get; set; } | |
public MyServiceReponsePayload(int servicePort) | |
{ | |
ServicePort = servicePort; | |
} | |
} | |
// the service discovery is useful for situations where you can't know ip addresses ahead of time | |
// e.g. multiplayer mobile game | |
// the service definition defines the discovery protocol and discovery ports | |
// and clients use udp to broadcast to request details from any hosts (TPayloadFormat) | |
// host can then send back details of the service (TPayloadFormat) like address, port, and any other metadata that is useful | |
// for a game it might be the name of the game, number of current players, etc. | |
// here we just take a basic string request and return a tuple with IP:port to demonstrate a 'complex' payload | |
public class MyServiceDefinition : JsonSerializedServiceDefinition<string, MyServiceReponsePayload> | |
{ | |
const string requestString = "Hi anyone there?"; | |
public int ServicePort { get; set; } | |
public MyServiceDefinition() | |
{ | |
} | |
public MyServiceDefinition(int actualServicePort) | |
{ | |
ServicePort = actualServicePort; | |
} | |
public override string DiscoveryRequest() | |
{ | |
return requestString; | |
} | |
public override MyServiceReponsePayload ResponseFor(string request) | |
{ | |
if (request == requestString) | |
{ | |
// this is legit, let's send back our details | |
return new MyServiceReponsePayload(ServicePort); | |
} | |
else | |
{ | |
// don't recognize this request, we might be getting hacked | |
// if we returns null, nothing is sent back | |
// it's as if no one was there.. | |
return null; | |
} | |
} | |
} | |
#endregion | |
#region Message Classes | |
public class Message | |
{ | |
public DateTime SentAt { get; set; } | |
public string Name { get; set; } | |
public Message(string name) | |
{ | |
Name = name; | |
SentAt = DateTime.UtcNow; | |
} | |
} | |
// just to demonstrate a 'more complex' type | |
public class MessageSubclass : Message | |
{ | |
public List<int> Numbers { get; set; } | |
public MyServiceDefinition ServiceDefinition { get; set; } | |
public MessageSubclass(string content, List<int> numbers, MyServiceDefinition sdef) : base(content) | |
{ | |
Numbers = numbers; | |
ServiceDefinition = sdef; | |
} | |
} | |
#endregion | |
#region Logger | |
public class LINQPadLogger : ILogger | |
{ | |
public LogLevel Level { get; set; } | |
public void Write(string message, LogLevel level) | |
{ | |
if (level < Level) | |
return; | |
var color = ""; | |
switch (level) | |
{ | |
case LogLevel.Debug: | |
color = "grey"; | |
break; | |
case LogLevel.Info: | |
color = "green"; | |
break; | |
case LogLevel.Warn: | |
color = "orange"; | |
break; | |
case LogLevel.Error: | |
color = "red"; | |
break; | |
case LogLevel.Fatal: | |
color = "red"; | |
break; | |
} | |
Util.WithStyle(message, String.Format("color: {0}", color)).Dump(); | |
} | |
} | |
#endregion | |
#region Ext | |
public static class Ext | |
{ | |
private static Random _r = new Random(); | |
public static T RandomItem<T>(this T[] collection) | |
{ | |
return collection[_r.Next(0, collection.Length - 1)]; | |
} | |
} | |
#endregion |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment