Skip to content

Instantly share code, notes, and snippets.

@rpgmaker
Created August 17, 2012 14:31
Show Gist options
  • Save rpgmaker/3379157 to your computer and use it in GitHub Desktop.
Save rpgmaker/3379157 to your computer and use it in GitHub Desktop.
ZeroMQReceiver
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Runtime.ExceptionServices;
using System.Threading;
using ZeroMQ;
namespace Phoenix.ZeroMQConnectors {
public sealed class ZeroMQReceiver : ZeroMQBase {
private Dictionary<string, Delegate> _commandDelegates;
private Dictionary<string, Delegate> _commandActions;
private readonly byte[] EMPTY_BUFFER = new byte[0];
private ManualResetEvent _reset = new ManualResetEvent(false);
private RegisteredWaitHandle _handle;
static ZeroMQReceiver() {
ZeroMQCommandReply.Register();
}
protected override SocketType SocketType { get { return SocketType.REP; } }
public ZeroMQReceiver(string address)
: base(address) {
_commandDelegates = new Dictionary<string, Delegate>();
_commandActions = new Dictionary<string, Delegate>();
_reset.Reset();
}
protected override void Setup(string address) {
Socket.Bind(address);
}
public void Register<TRequest, TResponse>(Func<TRequest, TResponse> command)
where TRequest : IZeroMQCommand
where TResponse : IZeroMQCommandReply
{
var key = String.Concat(typeof(TRequest).Name, typeof(TResponse).Name);
_commandDelegates[key] = command;
}
public void Register<TCommand>(Action<TCommand> command)
where TCommand : IZeroMQCommand {
var key = typeof(TCommand).Name;
_commandActions[key] = command;
}
private void SendEmptyResponse(System.Exception ex = null) {
//Reply with empty buffer since we don't care about response if not exception occured
try {
if (Socket == null) return;
if (ex != null)
Socket.Send(new ZeroMQError(ex).Serialize());
else
Socket.Send(EMPTY_BUFFER);
} catch(ObjectDisposedException) { /* Ignore exception */}
}
[HandleProcessCorruptedStateExceptions]
private void ProcessEvent(object state, bool timeout) {
if (timeout) {
try {
HandleCommand();
} catch (System.Exception ex) {
OnErrorReceived(ex);
}
_handle = ThreadPool.RegisterWaitForSingleObject(_reset, new WaitOrTimerCallback(ProcessEvent),
null, 1, true);
} else {
_handle.Unregister(null);
}
}
[HandleProcessCorruptedStateExceptions]
private void HandleCommand() {
var handshaked = false;
var socket = Socket;
if (socket == null) return;
try {
var buffer = socket.Recv(Timeout);
if (buffer.Length == 0) return;
var command = buffer.Deserialize<IZeroMQCommand>();
var key = command.Key;
var @delegate = _commandDelegates.ContainsKey(key)
? _commandDelegates[key] : _commandActions.ContainsKey(key) ?
_commandActions[key] : null;
if (@delegate == null) return;
if (command.Pattern == ZeroPatternType.RequestResponse) {
IZeroMQCommandReply response = null;
try {
response = @delegate.DynamicInvoke(command) as IZeroMQCommandReply;
} catch (System.Exception e) {
response = new ZeroMQCommandReply() {
Exception = new ZeroMQError(e)
} as IZeroMQCommandReply;
OnErrorReceived(e);
}
socket.Send(response.Serialize());
} else if (command.Pattern == ZeroPatternType.OneWay) {
System.Exception ex = null;
if (command.IsAsync)
Task.Factory.StartNew(o =>
{
var tuple = o as Tuple<Delegate, IZeroMQCommand>;
tuple.Item1.DynamicInvoke(tuple.Item2);
}, new Tuple<Delegate, IZeroMQCommand>(@delegate, command))
.HandleFault(ErrorReceived);
else {
try {
@delegate.DynamicInvoke(command);
} catch (System.Exception e) { ex = e; OnErrorReceived(e); }
}
SendEmptyResponse(ex);
handshaked = true;
}
} catch (AccessViolationException) {
//Socket broke and cannot be recovered
Reconnect();
} catch (System.Exception ex) {
if (!handshaked) SendEmptyResponse(ex);
OnErrorReceived(ex);
}
}
public void Start() {
_reset.Reset();
_handle = ThreadPool.RegisterWaitForSingleObject(_reset, new WaitOrTimerCallback(ProcessEvent),
null, 1, true);
}
private void OnErrorReceived(System.Exception ex) {
if (ErrorReceived == null) return;
ErrorReceived(ex);
}
public Action<System.Exception> ErrorReceived { get; set; }
protected override void Release() {
try {
_reset.Set();
} catch { } finally {
_commandDelegates.Clear();
_commandDelegates = null;
}
base.Release();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment