Created
August 17, 2012 14:31
-
-
Save rpgmaker/3379157 to your computer and use it in GitHub Desktop.
ZeroMQReceiver
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
using System.Runtime.ExceptionServices; | |
using System.Threading; | |
using ZeroMQ; | |
namespace Phoenix.ZeroMQConnectors { | |
public sealed class ZeroMQReceiver : ZeroMQBase { | |
private Dictionary<string, Delegate> _commandDelegates; | |
private Dictionary<string, Delegate> _commandActions; | |
private readonly byte[] EMPTY_BUFFER = new byte[0]; | |
private ManualResetEvent _reset = new ManualResetEvent(false); | |
private RegisteredWaitHandle _handle; | |
static ZeroMQReceiver() { | |
ZeroMQCommandReply.Register(); | |
} | |
protected override SocketType SocketType { get { return SocketType.REP; } } | |
public ZeroMQReceiver(string address) | |
: base(address) { | |
_commandDelegates = new Dictionary<string, Delegate>(); | |
_commandActions = new Dictionary<string, Delegate>(); | |
_reset.Reset(); | |
} | |
protected override void Setup(string address) { | |
Socket.Bind(address); | |
} | |
public void Register<TRequest, TResponse>(Func<TRequest, TResponse> command) | |
where TRequest : IZeroMQCommand | |
where TResponse : IZeroMQCommandReply | |
{ | |
var key = String.Concat(typeof(TRequest).Name, typeof(TResponse).Name); | |
_commandDelegates[key] = command; | |
} | |
public void Register<TCommand>(Action<TCommand> command) | |
where TCommand : IZeroMQCommand { | |
var key = typeof(TCommand).Name; | |
_commandActions[key] = command; | |
} | |
private void SendEmptyResponse(System.Exception ex = null) { | |
//Reply with empty buffer since we don't care about response if not exception occured | |
try { | |
if (Socket == null) return; | |
if (ex != null) | |
Socket.Send(new ZeroMQError(ex).Serialize()); | |
else | |
Socket.Send(EMPTY_BUFFER); | |
} catch(ObjectDisposedException) { /* Ignore exception */} | |
} | |
[HandleProcessCorruptedStateExceptions] | |
private void ProcessEvent(object state, bool timeout) { | |
if (timeout) { | |
try { | |
HandleCommand(); | |
} catch (System.Exception ex) { | |
OnErrorReceived(ex); | |
} | |
_handle = ThreadPool.RegisterWaitForSingleObject(_reset, new WaitOrTimerCallback(ProcessEvent), | |
null, 1, true); | |
} else { | |
_handle.Unregister(null); | |
} | |
} | |
[HandleProcessCorruptedStateExceptions] | |
private void HandleCommand() { | |
var handshaked = false; | |
var socket = Socket; | |
if (socket == null) return; | |
try { | |
var buffer = socket.Recv(Timeout); | |
if (buffer.Length == 0) return; | |
var command = buffer.Deserialize<IZeroMQCommand>(); | |
var key = command.Key; | |
var @delegate = _commandDelegates.ContainsKey(key) | |
? _commandDelegates[key] : _commandActions.ContainsKey(key) ? | |
_commandActions[key] : null; | |
if (@delegate == null) return; | |
if (command.Pattern == ZeroPatternType.RequestResponse) { | |
IZeroMQCommandReply response = null; | |
try { | |
response = @delegate.DynamicInvoke(command) as IZeroMQCommandReply; | |
} catch (System.Exception e) { | |
response = new ZeroMQCommandReply() { | |
Exception = new ZeroMQError(e) | |
} as IZeroMQCommandReply; | |
OnErrorReceived(e); | |
} | |
socket.Send(response.Serialize()); | |
} else if (command.Pattern == ZeroPatternType.OneWay) { | |
System.Exception ex = null; | |
if (command.IsAsync) | |
Task.Factory.StartNew(o => | |
{ | |
var tuple = o as Tuple<Delegate, IZeroMQCommand>; | |
tuple.Item1.DynamicInvoke(tuple.Item2); | |
}, new Tuple<Delegate, IZeroMQCommand>(@delegate, command)) | |
.HandleFault(ErrorReceived); | |
else { | |
try { | |
@delegate.DynamicInvoke(command); | |
} catch (System.Exception e) { ex = e; OnErrorReceived(e); } | |
} | |
SendEmptyResponse(ex); | |
handshaked = true; | |
} | |
} catch (AccessViolationException) { | |
//Socket broke and cannot be recovered | |
Reconnect(); | |
} catch (System.Exception ex) { | |
if (!handshaked) SendEmptyResponse(ex); | |
OnErrorReceived(ex); | |
} | |
} | |
public void Start() { | |
_reset.Reset(); | |
_handle = ThreadPool.RegisterWaitForSingleObject(_reset, new WaitOrTimerCallback(ProcessEvent), | |
null, 1, true); | |
} | |
private void OnErrorReceived(System.Exception ex) { | |
if (ErrorReceived == null) return; | |
ErrorReceived(ex); | |
} | |
public Action<System.Exception> ErrorReceived { get; set; } | |
protected override void Release() { | |
try { | |
_reset.Set(); | |
} catch { } finally { | |
_commandDelegates.Clear(); | |
_commandDelegates = null; | |
} | |
base.Release(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment