Skip to content

Instantly share code, notes, and snippets.

@rpgmaker
Created March 19, 2012 19:51
Show Gist options
  • Save rpgmaker/2125842 to your computer and use it in GitHub Desktop.
Save rpgmaker/2125842 to your computer and use it in GitHub Desktop.
Receiver Logic
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using ZMQ;
using System.Threading.Tasks;
using System.Runtime.ExceptionServices;
namespace Phoenix.ZeroMQConnectors {
public sealed class ZeroMQReceiver : ZeroMQBase {
private Dictionary<string, Delegate> _commandDelegates;
private Dictionary<string, Delegate> _commandActions;
private readonly byte[] EMPTY_BUFFER = new byte[0];
private volatile bool _running;
private Task _responseTask;
protected override SocketType SocketType { get { return SocketType.REP; } }
public ZeroMQReceiver(string address)
: base(address) {
_commandDelegates = new Dictionary<string, Delegate>();
_commandActions = new Dictionary<string, Delegate>();
}
protected override void Setup(string address) {
Socket.Bind(address);
}
public void Register<TRequest, TResponse>(Func<TRequest, TResponse> command)
where TRequest : IZeroMQCommand
where TResponse : IZeroMQCommandReply
{
var key = String.Concat(typeof(TRequest).Name, typeof(TResponse).Name);
_commandDelegates[key] = command;
}
public void Register<TCommand>(Action<TCommand> command)
where TCommand : IZeroMQCommand {
var key = typeof(TCommand).Name;
_commandActions[key] = command;
}
[HandleProcessCorruptedStateExceptions]
private void HandleCommand() {
while (_running) {
try {
var command = Socket.Recv().Deserialize<IZeroMQCommand>();
var key = command.Key;
var @delegate = _commandDelegates.ContainsKey(key)
? _commandDelegates[key] : _commandActions.ContainsKey(key) ?
_commandActions[key] : null;
if (@delegate == null) continue;
if (command.Pattern == ZeroPatternType.RequestResponse) {
var response = @delegate.DynamicInvoke(command) as IZeroMQCommandReply;
Socket.Send(response.Serialize());
} else if (command.Pattern == ZeroPatternType.OneWay) {
//Reply with empty buffer since we don't care about response
Socket.Send(EMPTY_BUFFER);
if (command.IsAsync) {
Task.Factory.StartNew(o =>
{
var tuple = o as Tuple<Delegate, IZeroMQCommand>;
tuple.Item1.DynamicInvoke(tuple.Item2);
}, new Tuple<Delegate, IZeroMQCommand>(@delegate, command))
.HandleFault(ErrorReceived);
} else
@delegate.DynamicInvoke(command);
}
} catch (AccessViolationException ex) {
OnErrorReceived(ex);
//Socket broke and cannot be recovered
break;
} catch (System.Exception ex) {
OnErrorReceived(ex);
}
}
}
public void Start() {
_running = true;
_responseTask =
Task.Factory.StartNew(HandleCommand, TaskCreationOptions.LongRunning)
.HandleFault(ErrorReceived);
}
private void OnErrorReceived(System.Exception ex) {
if (ErrorReceived == null) return;
ErrorReceived(ex);
}
public Action<System.Exception> ErrorReceived { get; set; }
protected override void Release() {
try {
if (_responseTask != null)
_responseTask.Dispose();
} catch { } finally {
_running = false;
_responseTask = null;
_commandDelegates.Clear();
_commandDelegates = null;
}
base.Release();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment