Created
March 19, 2012 19:51
-
-
Save rpgmaker/2125842 to your computer and use it in GitHub Desktop.
Receiver Logic
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using ZMQ; | |
using System.Threading.Tasks; | |
using System.Runtime.ExceptionServices; | |
namespace Phoenix.ZeroMQConnectors { | |
public sealed class ZeroMQReceiver : ZeroMQBase { | |
private Dictionary<string, Delegate> _commandDelegates; | |
private Dictionary<string, Delegate> _commandActions; | |
private readonly byte[] EMPTY_BUFFER = new byte[0]; | |
private volatile bool _running; | |
private Task _responseTask; | |
protected override SocketType SocketType { get { return SocketType.REP; } } | |
public ZeroMQReceiver(string address) | |
: base(address) { | |
_commandDelegates = new Dictionary<string, Delegate>(); | |
_commandActions = new Dictionary<string, Delegate>(); | |
} | |
protected override void Setup(string address) { | |
Socket.Bind(address); | |
} | |
public void Register<TRequest, TResponse>(Func<TRequest, TResponse> command) | |
where TRequest : IZeroMQCommand | |
where TResponse : IZeroMQCommandReply | |
{ | |
var key = String.Concat(typeof(TRequest).Name, typeof(TResponse).Name); | |
_commandDelegates[key] = command; | |
} | |
public void Register<TCommand>(Action<TCommand> command) | |
where TCommand : IZeroMQCommand { | |
var key = typeof(TCommand).Name; | |
_commandActions[key] = command; | |
} | |
[HandleProcessCorruptedStateExceptions] | |
private void HandleCommand() { | |
while (_running) { | |
try { | |
var command = Socket.Recv().Deserialize<IZeroMQCommand>(); | |
var key = command.Key; | |
var @delegate = _commandDelegates.ContainsKey(key) | |
? _commandDelegates[key] : _commandActions.ContainsKey(key) ? | |
_commandActions[key] : null; | |
if (@delegate == null) continue; | |
if (command.Pattern == ZeroPatternType.RequestResponse) { | |
var response = @delegate.DynamicInvoke(command) as IZeroMQCommandReply; | |
Socket.Send(response.Serialize()); | |
} else if (command.Pattern == ZeroPatternType.OneWay) { | |
//Reply with empty buffer since we don't care about response | |
Socket.Send(EMPTY_BUFFER); | |
if (command.IsAsync) { | |
Task.Factory.StartNew(o => | |
{ | |
var tuple = o as Tuple<Delegate, IZeroMQCommand>; | |
tuple.Item1.DynamicInvoke(tuple.Item2); | |
}, new Tuple<Delegate, IZeroMQCommand>(@delegate, command)) | |
.HandleFault(ErrorReceived); | |
} else | |
@delegate.DynamicInvoke(command); | |
} | |
} catch (AccessViolationException ex) { | |
OnErrorReceived(ex); | |
//Socket broke and cannot be recovered | |
break; | |
} catch (System.Exception ex) { | |
OnErrorReceived(ex); | |
} | |
} | |
} | |
public void Start() { | |
_running = true; | |
_responseTask = | |
Task.Factory.StartNew(HandleCommand, TaskCreationOptions.LongRunning) | |
.HandleFault(ErrorReceived); | |
} | |
private void OnErrorReceived(System.Exception ex) { | |
if (ErrorReceived == null) return; | |
ErrorReceived(ex); | |
} | |
public Action<System.Exception> ErrorReceived { get; set; } | |
protected override void Release() { | |
try { | |
if (_responseTask != null) | |
_responseTask.Dispose(); | |
} catch { } finally { | |
_running = false; | |
_responseTask = null; | |
_commandDelegates.Clear(); | |
_commandDelegates = null; | |
} | |
base.Release(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment