Skip to content

Instantly share code, notes, and snippets.

@Pzixel
Created September 12, 2019 09:33
Show Gist options
  • Save Pzixel/6300578be601520054e3c94b8618c16d to your computer and use it in GitHub Desktop.
Save Pzixel/6300578be601520054e3c94b8618c16d to your computer and use it in GitHub Desktop.
using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
using Directive = Akka.Streams.Supervision.Directive;
namespace ConsoleApp9
{
class Program
{
static void Main(string[] args)
{
var runnableGraph = Source.FromGraph(new RabbitSource<int>("queue",
new MqFactory(new MqFactorySettings("url", "admin", "admin"))))
.Via(RestartFlow.OnFailuresWithBackoff(
() => Flow.Create<RabbitMessage<int>>().SelectAsync(1,
XFinalizableException.WrapAsync<RabbitMessage<int>, RabbitMessage<int>>(async x =>
x.Data == 3 ? throw new Exception() : await Task.FromResult(x))),
TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(500), 0.2))
.ToMaterialized(Sink.ForEach<RabbitMessage<int>>(x =>
{
Console.WriteLine(x.Data);
x.Ack();
}), Keep.Right);
using (var sys = ActorSystem.Create("Backup-Test"))
using (var mat = ActorMaterializer.Create(sys, ActorMaterializerSettings.Create(sys).WithSupervisionStrategy(
cause =>
{
if (cause is AggregateException ae)
{
cause = ae.InnerException;
}
if (cause is FinalizableException<RabbitMessage<int>> fe)
{
Console.WriteLine($"Error happened: {fe.InnerException}. Finalization data is {fe.FinalizationData}");
}
else if (cause is FinalizableException)
{
Console.WriteLine("Unregistered finalization!");
return Directive.Stop;
}
else
{
Console.WriteLine($"Regular error happened: {cause}. Nothing to finalize");
}
return Directive.Resume;
})))
{
runnableGraph.Run(mat).Wait();
}
}
}
internal class MqFactorySettings : IMqFactorySettings
{
public string Host { get; }
public string UserName { get; }
public string Password { get; }
public MqFactorySettings(string host, string userName, string password)
{
Host = host;
UserName = userName;
Password = password;
}
}
public abstract class FinalizableException : Exception
{
protected FinalizableException()
{
}
protected FinalizableException(string message, Exception innerException) : base(message, innerException)
{
}
}
public class FinalizableException<T> : FinalizableException
{
public T FinalizationData { get; }
public FinalizableException(T finalizationData)
{
FinalizationData = finalizationData;
}
public FinalizableException(T finalizationData, Exception innerException) : base("FinalizableException", innerException)
{
FinalizationData = finalizationData;
}
}
public static class XFinalizableException
{
public static FinalizableException<T> Wrap<T>(this Exception ex, T finalizationData) =>
new FinalizableException<T>(finalizationData, ex);
public static Func<T, TResult> Wrap<T, TResult>(Func<T, TResult> action) => data =>
{
try
{
return action(data);
}
catch (Exception ex)
{
throw ex.Wrap(data);
}
};
public static Func<T, Task<TResult>> WrapAsync<T, TResult>(Func<T, Task<TResult>> action) => async data =>
{
try
{
return await action(data);
}
catch (Exception ex)
{
throw ex.Wrap(data);
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment