Last active
December 11, 2017 07:34
-
-
Save SteveBate/a3d4a7a48f156f9ea8eeb1bc43acef14 to your computer and use it in GitHub Desktop.
Latest implementation of the in-process pipe and filter model
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void Main() | |
{ | |
var msg = new BritvicImportMessage | |
{ | |
File = @"c:\test.txt", | |
OnStart = () => Console.WriteLine("Starting"), | |
OnSuccess = () => Console.WriteLine("Success!"), | |
OnComplete = () => Console.WriteLine("Done!"), | |
OnError = (ex) => Console.WriteLine("Logging Exception: " + ex), | |
OnStep = Console.WriteLine | |
}; | |
var pl = new Pipeline<BritvicImportMessage>(); | |
pl.AddAspect(new BackgroundAspect<BritvicImportMessage>()); | |
pl.AddAspect(new ExceptionAspect<BritvicImportMessage>()); | |
pl.AddAspect(new TransactionAspect<BritvicImportMessage>()); | |
pl.AddAspect(new AuditAspect<BritvicImportMessage>()); | |
pl.Register(new Step1()); | |
pl.Register(new Step2()); | |
pl.Register(new Step3()); | |
pl.Invoke(msg); | |
"test...".Dump(); | |
} | |
// Define other methods and classes here | |
public class BritvicImportMessage : BaseMessage | |
{ | |
public string File { get; set; } | |
} | |
public class Step1 : Filter<BritvicImportMessage> | |
{ | |
public void Execute(BritvicImportMessage msg) | |
{ | |
msg.OnStep($"Loading file: {msg.File}"); | |
} | |
} | |
public class Step2 : Filter<BritvicImportMessage> | |
{ | |
public void Execute(BritvicImportMessage msg) | |
{ | |
//throw new Exception("oh oh!"); | |
msg.OnStep("Parsing budgets - 20 found"); | |
} | |
} | |
public class Step3 : Filter<BritvicImportMessage> | |
{ | |
public void Execute(BritvicImportMessage msg) | |
{ | |
msg.OnStep("inserting budgets into staging table"); | |
} | |
} | |
// aspects | |
public class BackgroundAspect<T> : Aspect<T>, Filter<T> where T : BaseMessage | |
{ | |
public void Execute(T msg) | |
{ | |
ThreadPool.QueueUserWorkItem(cb => | |
{ | |
Console.WriteLine("starting work on background thread"); | |
Next.Execute(msg); | |
Console.WriteLine("finished work on background thread"); | |
}); | |
} | |
public Aspect<T> Next { get; set; } | |
} | |
public class ExceptionAspect<T> : Aspect<T>, Filter<T> where T : BaseMessage | |
{ | |
public void Execute(T msg) | |
{ | |
Console.WriteLine("logging started for msg - " + msg.ToString()); | |
try | |
{ | |
Next.Execute(msg); | |
} | |
catch (Exception ex) | |
{ | |
msg.OnError(ex); | |
} | |
} | |
public Aspect<T> Next { get; set; } | |
} | |
public class TransactionAspect<T> : Aspect<T>, Filter<T> where T : BaseMessage | |
{ | |
public void Execute(T msg) | |
{ | |
using (var scope = new TransactionScope()) | |
{ | |
Console.WriteLine("start transaction"); | |
Next.Execute(msg); | |
scope.Complete(); | |
Console.WriteLine("end transaction"); | |
} | |
} | |
public Aspect<T> Next { get; set; } | |
} | |
public class AuditAspect<T> : Aspect<T>, Filter<T> where T : BaseMessage | |
{ | |
public void Execute(T msg) | |
{ | |
Next.Execute(msg); | |
Console.WriteLine("sending audit message to WPKIT04"); | |
} | |
public Aspect<T> Next { get; set; } | |
} | |
// core | |
public class CancellationToken | |
{ | |
public bool Stop { get; set; } | |
} | |
public abstract class BaseMessage | |
{ | |
public CancellationToken CancellationToken { get; } = new CancellationToken(); | |
public Action<Exception> OnError = delegate { }; | |
public Action OnStart = delegate {}; | |
public Action OnComplete = delegate { }; | |
public Action OnSuccess = delegate { }; | |
public Action<string> OnStep = delegate {}; | |
} | |
public interface Aspect<T> where T : BaseMessage | |
{ | |
void Execute(T msg); | |
Aspect<T> Next { get; set; } | |
} | |
public interface Filter<T> where T : BaseMessage | |
{ | |
void Execute(T msg); | |
} | |
public class Pipeline<T> where T : BaseMessage | |
{ | |
/// <summary> | |
/// Constructor initializes the pipeline creating a private aspect as the context in which registered filters are executed | |
/// </summary> | |
public Pipeline() | |
{ | |
_aspects.Add(new ExecutionContext(this.Execute)); | |
} | |
/// Provides the means to supply cross cutting concerns around the actual unit of work. | |
public void AddAspect(Aspect<T> aspect) | |
{ | |
_aspects.Insert(_aspects.Count-1, aspect); | |
_aspects.Aggregate((a,b) => a.Next = b); | |
} | |
/// Register keeps track of the individual steps that make up the pipeline | |
public void Register(Filter<T> filter) | |
{ | |
_filters.Add(filter); | |
} | |
/// Finally registers filters that must run even when an error occurs | |
public void Finally(Filter<T> filter) | |
{ | |
_finallyFilters.Add(filter); | |
} | |
/// Invoke kicks of the unit of work including all registered aspects | |
public void Invoke(T msg) | |
{ | |
_aspects.First().Execute(msg); | |
} | |
List<Filter<T>> _filters = new List<Filter<T>>(); | |
List<Filter<T>> _finallyFilters = new List<Filter<T>>(); | |
List<Aspect<T>> _aspects = new List<Aspect<T>>(); | |
/// Execute is where the actual steps/filters are iterated through | |
void Execute(T msg) | |
{ | |
if (msg.OnStart != null) { msg.OnStart(); } | |
try | |
{ | |
foreach (var f in _filters) | |
{ | |
if (msg.CancellationToken.Stop) return; | |
f.Execute(msg); | |
} | |
if (msg.OnSuccess != null) { msg.OnSuccess(); } | |
} | |
finally | |
{ | |
foreach (var f in _finallyFilters) | |
{ | |
f.Execute(msg); | |
} | |
if (msg.OnComplete != null) { msg.OnComplete(); } | |
} | |
} | |
/// ExecutionContext is an aspect that the pipeline's unit of work runs under | |
class ExecutionContext : Aspect<T>, Filter<T> | |
{ | |
public ExecutionContext(Action<T> action) | |
{ | |
inner = action; | |
} | |
public void Execute(T msg) | |
{ | |
inner(msg); | |
} | |
public Aspect<T> Next { get; set; } | |
Action<T> inner; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment