Created
September 12, 2019 09:33
-
-
Save Pzixel/6300578be601520054e3c94b8618c16d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using System.Threading.Tasks; | |
using Akka.Actor; | |
using Akka.Streams; | |
using Akka.Streams.Dsl; | |
using Directive = Akka.Streams.Supervision.Directive; | |
namespace ConsoleApp9 | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var runnableGraph = Source.FromGraph(new RabbitSource<int>("queue", | |
new MqFactory(new MqFactorySettings("url", "admin", "admin")))) | |
.Via(RestartFlow.OnFailuresWithBackoff( | |
() => Flow.Create<RabbitMessage<int>>().SelectAsync(1, | |
XFinalizableException.WrapAsync<RabbitMessage<int>, RabbitMessage<int>>(async x => | |
x.Data == 3 ? throw new Exception() : await Task.FromResult(x))), | |
TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(500), 0.2)) | |
.ToMaterialized(Sink.ForEach<RabbitMessage<int>>(x => | |
{ | |
Console.WriteLine(x.Data); | |
x.Ack(); | |
}), Keep.Right); | |
using (var sys = ActorSystem.Create("Backup-Test")) | |
using (var mat = ActorMaterializer.Create(sys, ActorMaterializerSettings.Create(sys).WithSupervisionStrategy( | |
cause => | |
{ | |
if (cause is AggregateException ae) | |
{ | |
cause = ae.InnerException; | |
} | |
if (cause is FinalizableException<RabbitMessage<int>> fe) | |
{ | |
Console.WriteLine($"Error happened: {fe.InnerException}. Finalization data is {fe.FinalizationData}"); | |
} | |
else if (cause is FinalizableException) | |
{ | |
Console.WriteLine("Unregistered finalization!"); | |
return Directive.Stop; | |
} | |
else | |
{ | |
Console.WriteLine($"Regular error happened: {cause}. Nothing to finalize"); | |
} | |
return Directive.Resume; | |
}))) | |
{ | |
runnableGraph.Run(mat).Wait(); | |
} | |
} | |
} | |
internal class MqFactorySettings : IMqFactorySettings | |
{ | |
public string Host { get; } | |
public string UserName { get; } | |
public string Password { get; } | |
public MqFactorySettings(string host, string userName, string password) | |
{ | |
Host = host; | |
UserName = userName; | |
Password = password; | |
} | |
} | |
public abstract class FinalizableException : Exception | |
{ | |
protected FinalizableException() | |
{ | |
} | |
protected FinalizableException(string message, Exception innerException) : base(message, innerException) | |
{ | |
} | |
} | |
public class FinalizableException<T> : FinalizableException | |
{ | |
public T FinalizationData { get; } | |
public FinalizableException(T finalizationData) | |
{ | |
FinalizationData = finalizationData; | |
} | |
public FinalizableException(T finalizationData, Exception innerException) : base("FinalizableException", innerException) | |
{ | |
FinalizationData = finalizationData; | |
} | |
} | |
public static class XFinalizableException | |
{ | |
public static FinalizableException<T> Wrap<T>(this Exception ex, T finalizationData) => | |
new FinalizableException<T>(finalizationData, ex); | |
public static Func<T, TResult> Wrap<T, TResult>(Func<T, TResult> action) => data => | |
{ | |
try | |
{ | |
return action(data); | |
} | |
catch (Exception ex) | |
{ | |
throw ex.Wrap(data); | |
} | |
}; | |
public static Func<T, Task<TResult>> WrapAsync<T, TResult>(Func<T, Task<TResult>> action) => async data => | |
{ | |
try | |
{ | |
return await action(data); | |
} | |
catch (Exception ex) | |
{ | |
throw ex.Wrap(data); | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment