Skip to content

Instantly share code, notes, and snippets.

@SteveBate
Last active December 11, 2017 07:34
Show Gist options
  • Save SteveBate/a3d4a7a48f156f9ea8eeb1bc43acef14 to your computer and use it in GitHub Desktop.
Save SteveBate/a3d4a7a48f156f9ea8eeb1bc43acef14 to your computer and use it in GitHub Desktop.
Latest implementation of the in-process pipe and filter model
void Main()
{
var msg = new BritvicImportMessage
{
File = @"c:\test.txt",
OnStart = () => Console.WriteLine("Starting"),
OnSuccess = () => Console.WriteLine("Success!"),
OnComplete = () => Console.WriteLine("Done!"),
OnError = (ex) => Console.WriteLine("Logging Exception: " + ex),
OnStep = Console.WriteLine
};
var pl = new Pipeline<BritvicImportMessage>();
pl.AddAspect(new BackgroundAspect<BritvicImportMessage>());
pl.AddAspect(new ExceptionAspect<BritvicImportMessage>());
pl.AddAspect(new TransactionAspect<BritvicImportMessage>());
pl.AddAspect(new AuditAspect<BritvicImportMessage>());
pl.Register(new Step1());
pl.Register(new Step2());
pl.Register(new Step3());
pl.Invoke(msg);
"test...".Dump();
}
// Define other methods and classes here
public class BritvicImportMessage : BaseMessage
{
public string File { get; set; }
}
public class Step1 : Filter<BritvicImportMessage>
{
public void Execute(BritvicImportMessage msg)
{
msg.OnStep($"Loading file: {msg.File}");
}
}
public class Step2 : Filter<BritvicImportMessage>
{
public void Execute(BritvicImportMessage msg)
{
//throw new Exception("oh oh!");
msg.OnStep("Parsing budgets - 20 found");
}
}
public class Step3 : Filter<BritvicImportMessage>
{
public void Execute(BritvicImportMessage msg)
{
msg.OnStep("inserting budgets into staging table");
}
}
// aspects
public class BackgroundAspect<T> : Aspect<T>, Filter<T> where T : BaseMessage
{
public void Execute(T msg)
{
ThreadPool.QueueUserWorkItem(cb =>
{
Console.WriteLine("starting work on background thread");
Next.Execute(msg);
Console.WriteLine("finished work on background thread");
});
}
public Aspect<T> Next { get; set; }
}
public class ExceptionAspect<T> : Aspect<T>, Filter<T> where T : BaseMessage
{
public void Execute(T msg)
{
Console.WriteLine("logging started for msg - " + msg.ToString());
try
{
Next.Execute(msg);
}
catch (Exception ex)
{
msg.OnError(ex);
}
}
public Aspect<T> Next { get; set; }
}
public class TransactionAspect<T> : Aspect<T>, Filter<T> where T : BaseMessage
{
public void Execute(T msg)
{
using (var scope = new TransactionScope())
{
Console.WriteLine("start transaction");
Next.Execute(msg);
scope.Complete();
Console.WriteLine("end transaction");
}
}
public Aspect<T> Next { get; set; }
}
public class AuditAspect<T> : Aspect<T>, Filter<T> where T : BaseMessage
{
public void Execute(T msg)
{
Next.Execute(msg);
Console.WriteLine("sending audit message to WPKIT04");
}
public Aspect<T> Next { get; set; }
}
// core
public class CancellationToken
{
public bool Stop { get; set; }
}
public abstract class BaseMessage
{
public CancellationToken CancellationToken { get; } = new CancellationToken();
public Action<Exception> OnError = delegate { };
public Action OnStart = delegate {};
public Action OnComplete = delegate { };
public Action OnSuccess = delegate { };
public Action<string> OnStep = delegate {};
}
public interface Aspect<T> where T : BaseMessage
{
void Execute(T msg);
Aspect<T> Next { get; set; }
}
public interface Filter<T> where T : BaseMessage
{
void Execute(T msg);
}
public class Pipeline<T> where T : BaseMessage
{
/// <summary>
/// Constructor initializes the pipeline creating a private aspect as the context in which registered filters are executed
/// </summary>
public Pipeline()
{
_aspects.Add(new ExecutionContext(this.Execute));
}
/// Provides the means to supply cross cutting concerns around the actual unit of work.
public void AddAspect(Aspect<T> aspect)
{
_aspects.Insert(_aspects.Count-1, aspect);
_aspects.Aggregate((a,b) => a.Next = b);
}
/// Register keeps track of the individual steps that make up the pipeline
public void Register(Filter<T> filter)
{
_filters.Add(filter);
}
/// Finally registers filters that must run even when an error occurs
public void Finally(Filter<T> filter)
{
_finallyFilters.Add(filter);
}
/// Invoke kicks of the unit of work including all registered aspects
public void Invoke(T msg)
{
_aspects.First().Execute(msg);
}
List<Filter<T>> _filters = new List<Filter<T>>();
List<Filter<T>> _finallyFilters = new List<Filter<T>>();
List<Aspect<T>> _aspects = new List<Aspect<T>>();
/// Execute is where the actual steps/filters are iterated through
void Execute(T msg)
{
if (msg.OnStart != null) { msg.OnStart(); }
try
{
foreach (var f in _filters)
{
if (msg.CancellationToken.Stop) return;
f.Execute(msg);
}
if (msg.OnSuccess != null) { msg.OnSuccess(); }
}
finally
{
foreach (var f in _finallyFilters)
{
f.Execute(msg);
}
if (msg.OnComplete != null) { msg.OnComplete(); }
}
}
/// ExecutionContext is an aspect that the pipeline's unit of work runs under
class ExecutionContext : Aspect<T>, Filter<T>
{
public ExecutionContext(Action<T> action)
{
inner = action;
}
public void Execute(T msg)
{
inner(msg);
}
public Aspect<T> Next { get; set; }
Action<T> inner;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment