Created
February 20, 2012 15:19
-
-
Save haf/1869638 to your computer and use it in GitHub Desktop.
Asynchronous Retries MassTransit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Numerics; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Linq; | |
using Magnum.Extensions; | |
using Magnum.Policies; | |
using MassTransit.Context; | |
using NUnit.Framework; | |
namespace MassTransit.Transports.Tests | |
{ | |
interface Cmd | |
{ | |
} | |
class NewId | |
{ | |
internal static int Generate() | |
{ | |
return 42; | |
} | |
} | |
class MTConfig | |
{ | |
internal static Uri Endpoint | |
{ | |
get { return new Uri("rabbitmq://ha-cluster-4.corp.local/ep42"); } | |
} | |
} | |
class Management | |
{ | |
readonly Redis _r; | |
readonly Uri _endpoint; | |
public Management(Redis r, Uri endpoint) | |
{ | |
_r = r; | |
_endpoint = endpoint; | |
} | |
public Management(Uri endpoint) | |
{ | |
_endpoint = endpoint; | |
} | |
internal void Bind() | |
{ | |
// IModel.BindExchange(_endpoint.Name, _endpoint.Name, etc); | |
} | |
public void SendUnacked(Bus b) | |
{ | |
foreach (var msg in _r.SyncPeekSet(_endpoint)) | |
{ | |
b.PublishContext((ISendContext<Cmd>) msg); | |
} | |
} | |
} | |
class Redis | |
{ | |
Dictionary<Uri, HashSet<object>> lookup = new Dictionary<Uri, HashSet<object>>(); | |
HashSet<object> Get(Uri sourceEndpointUri) | |
{ | |
// + locking etc | |
HashSet<object> sett; | |
if (!lookup.ContainsKey(sourceEndpointUri)) | |
sett = lookup[sourceEndpointUri] = new HashSet<object>(); | |
else | |
sett = lookup[sourceEndpointUri]; | |
return sett; | |
} | |
internal void SyncAppendSet<T>( | |
Uri sourceEndpointUri, | |
ISendContext<T> message) where T : class | |
{ | |
Get(sourceEndpointUri).Add(message); | |
} | |
internal void SyncRemoveSet<T>( | |
Uri sourceEndpointUri, | |
ISendContext<T> message) where T : class | |
{ | |
Get(sourceEndpointUri).Remove(message); | |
} | |
public IEnumerable<ISendContext> SyncPeekSet(Uri sourceEndpointUri) | |
{ | |
return Get(sourceEndpointUri).Cast<ISendContext>(); | |
} | |
} | |
/// <summary>simple unit () type of the ACK</summary> | |
class ACK | |
{ | |
} | |
/// <summary>Option type like discriminated unions are</summary> | |
class Cell<TEither, TOr> | |
{ | |
volatile bool set; | |
public Cell() | |
{ | |
} | |
public Cell(TEither either) | |
{ | |
// lock etc | |
if (set) return; | |
} | |
public Cell(TOr or) | |
{ | |
if (set) return; | |
} | |
public void Set(TEither either) | |
{ | |
} | |
public void Set(TOr or) | |
{ | |
} | |
} | |
static class AckBuilder | |
{ | |
// TPoll has the same semantics as Poll would with ZeroMQ - a poll is either | |
// for the implicit (non-reply) callback of the send of TMsg | |
// because of the lack of a failure from the transport | |
// OR the actual message we got back | |
internal static Action<Cell<ReceiveContext, ISendContext<TMsg>>> DefaultFor<TMsg, TPoll>( | |
// curry away the bus, and create a new AckBuilder with it curried away | |
Bus b, | |
// user-passed callback | |
Action<Cell<TPoll, ISendContext<TMsg>>> callback = null, | |
// a selector that e.g. checks the message id; similar to the CorrelateBy(T -> bool) in sagas | |
Func<TPoll, bool> selector = null) | |
where TMsg : class | |
{ | |
selector = selector ?? (tp => true /* always ACK on the type per default */); | |
var cell = new Cell<TPoll, ISendContext<TMsg>>(); | |
// must make sure either of these lambdas are called; not both, but that threading concern is outside my aim of | |
// this discussion-oriented code snippet | |
MassTransit.UnsubscribeAction unsub = null; | |
unsub = b.SubscribeHandler(selector, tpoll => | |
{ | |
cell.Set(tpoll); | |
callback(cell); | |
unsub(); | |
}); | |
return c => | |
{ | |
// cell.Set(c.Value) | |
callback(cell); | |
unsub(); | |
}; | |
} | |
} | |
/// <summary>This message is the event that the domain service publishes if the command passed</summary> | |
interface CmdEventActingAsACK | |
{ | |
} | |
class GuiBus // simply showing UI updates | |
{ | |
internal void Publish<T>() | |
{ | |
// etc... | |
} | |
} | |
interface DisplayErrorDialog | |
{ | |
} | |
class Usage | |
{ | |
GuiBus guiEventing = new GuiBus(); | |
Bus b; | |
void simple_publish_async() | |
{ | |
b.Publish<Cmd>(new | |
{ | |
Value = 3.4, | |
Other = "Rat" | |
}, | |
// pass a policy which can be curried away | |
// but specifies 1-2 times to just re-send in case msg got lost | |
PolicyBuilder.For<Exception /* delivery failed, MT specific ex */>( | |
ex => /*ex.Retries == 5*/ true) | |
.CircuitBreak(5.Seconds(), 2 /* times */), | |
// send the interaction pieces into the bus; this is the 'application logic' | |
// on ack, nack and failure logic | |
AckBuilder.DefaultFor<Cmd, CmdEventActingAsACK>(b), | |
// here we say that we're sending a command, so we're going to keep retrying until | |
// we succeed | |
(error, sendContext) => | |
{ | |
guiEventing.Publish<DisplayErrorDialog>(); | |
return TransportUnreachableProgrammerDecision.ContinueTryingTillAvailable; | |
}); | |
} | |
} | |
/// <summary> | |
/// in the case of RMQ we don't get ACKs for successes, but we are sure to get | |
/// an asynchronous exception if something goes wrong, within 10 seconds | |
/// </summary> | |
class Scheduler | |
{ | |
Timer t; | |
public Scheduler() | |
{ | |
t = new Timer(Tick); | |
} | |
// crude scheduler | |
// actually, I want a min-heap here, sorted by date time ascending! | |
// so lets | |
SortedList<DateTime, Action> callbacks = new SortedList<DateTime, Action>(); | |
void Tick(object state) | |
{ | |
Check(); | |
} | |
// perform all callbacks | |
void Check() | |
{ | |
// loop that calls back the Actions | |
KeyValuePair<DateTime, Action> keyValuePair; | |
while ((keyValuePair = callbacks.First()).Key < DateTime.UtcNow) | |
{ | |
callbacks.RemoveAt(0); | |
keyValuePair.Value(); | |
} | |
} | |
internal void Schedule(Action callback, TimeSpan dueTime) | |
{ | |
callbacks.Add(DateTime.UtcNow + dueTime, callback); | |
} | |
} | |
enum TransportUnreachableProgrammerDecision | |
{ | |
/// <summary> | |
/// Meaning: I can't actually do anything; this is a command that I need to send | |
/// and that command is not temporally tied in any way that I care about, so just | |
/// continue at it, and send that darn command when you get back up. This is the | |
/// 'occasionally connected' answer to the decision on whether to publish on publish | |
/// failed. This means we're storing the send context locally and then publishing it. | |
/// </summary> | |
ContinueTryingTillAvailable | |
} | |
class Bus | |
{ | |
Redis redis = new Redis(); | |
OutTransport t = new OutTransport(null); | |
Scheduler s = new Scheduler(); | |
internal void Publish<T>(object vals, | |
// this exception policy is only a decider on when to notify the error callback (4th param) | |
// but most often it shouldn't do very much, because connectivity errors and the like | |
// should be handled by the transports and their connection handlers | |
ExceptionPolicy exceptionPolicy, | |
Action<Cell<ReceiveContext, ISendContext<T>>> ackCallback, | |
Func<Exception, ISendContext<T>, TransportUnreachableProgrammerDecision> errorCallback) | |
where T : class | |
{ | |
var sendContext = new SendContext<T>((T) vals); | |
// need a message id, this is the thing we track in redis | |
sendContext.SetMessageId(NewId.Generate().ToString()); | |
// save book keeping data about the send context in case the publish is never ACKed | |
redis.SyncAppendSet(MTConfig.Endpoint, sendContext); | |
// perform the actual publish which writes to network buffers, in-proc buffers and so on | |
t.Publish(sendContext, errorCallback); | |
// make sure that we remove the send context from the list of outstanding | |
// contexts from redis when we can be sure we've sent it properly to the broker | |
// We closure-capture, because the incoming lambda wants just the 'semantic' ACK | |
// which is not necessarily the actual sending to be ACKed | |
// By doing it this way we may support ZMQ, event coming in over a SUB socket in the | |
// case of an event, or a ACK coming in over a PULL socket | |
ackCallback += _ => redis.SyncRemoveSet(MTConfig.Endpoint, sendContext); | |
// since we don't know about what RMQ does in the background, but | |
// we know it notifies us on failure in at least 10 seconds, schedule the removal | |
// of the "outstanding message" in ten seconds | |
s.Schedule(() => ackCallback(new Cell<ReceiveContext, ISendContext<T>>(sendContext)), 10.Seconds()); | |
} | |
// called when we are starting up, no subscriptions at all, but here we may let the programmer | |
// register a callback on application startup that allows the programmer to handle the outstanding | |
// messages that were not sent; e.g. by enqueueing them together with a batch of fresh messages | |
// that was received from the domain service; and in this case we may need to do event merging. | |
public void PublishContext<T>(ISendContext<T> sendContext) | |
where T : class | |
{ | |
// normal send here | |
t.Publish(sendContext, (e, ctx) => TransportUnreachableProgrammerDecision.ContinueTryingTillAvailable); | |
} | |
Management m; | |
void OnStartUp() | |
{ | |
// when we start we do the normal stuff | |
m = new Management(MTConfig.Endpoint); | |
// such as binding our queue to an exchange | |
m.Bind(); | |
// then we make sure we have an empty slate, by sending all | |
// messages that were never sent | |
m.SendUnacked(this); | |
} | |
public MassTransit.UnsubscribeAction SubscribeHandler<T>(Func<T, bool> selector, Action<T> action) | |
{ | |
// etc... | |
return () => true; | |
} | |
interface IModel | |
{ | |
event EventHandler TransportError; | |
// really, it doesn't look like this, but it's similar | |
void AsyncPublish<T>(ISendContext<T> context) where T : class; | |
} | |
class OutTransport | |
{ | |
readonly IModel _connectionHandler; | |
readonly List<Func<Exception, TransportUnreachableProgrammerDecision>> errorCallbacks | |
= new List<Func<Exception, TransportUnreachableProgrammerDecision>>(); | |
public OutTransport(IModel connectionHandler) | |
{ | |
_connectionHandler = connectionHandler; | |
_connectionHandler.TransportError += (object sender, EventArgs e) => | |
{ | |
lock (errorCallbacks) | |
{ | |
errorCallbacks.ForEach(cb => | |
{ | |
// var error = e.Error; | |
string error = ""; | |
var decision = cb(new Exception(error)); | |
// enqueue the corresponding action on an actor | |
// that requeues the send potentially | |
// or just gives up | |
}); | |
errorCallbacks.Clear(); | |
} | |
// somehow correlate all of the | |
}; | |
} | |
public void Publish<T>(ISendContext<T> sendContext, | |
Func<Exception, ISendContext<T>, TransportUnreachableProgrammerDecision> errorCallback) | |
where T : class | |
{ | |
errorCallbacks.Add((err) => errorCallback(err, sendContext)); | |
_connectionHandler.AsyncPublish(sendContext); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment