-
-
Save jermdavis/4dcf6568d5cd44f03d0ec503620ac177 to your computer and use it in GitHub Desktop.
using System; | |
namespace StronglyTypedPipelines | |
{ | |
/// <summary> | |
/// A base interface required so that reflection code can create a Step from its type name, | |
/// without needing to understand its generic parameters first. | |
/// </summary> | |
public interface IPipelineStep | |
{ | |
} | |
/// <summary> | |
/// Base type for individual pipeline steps. | |
/// Descendants of this type map an input value to an output value. | |
/// The input and output types can differ. | |
/// </summary> | |
public interface IPipelineStep<INPUT, OUTPUT> : IPipelineStep | |
{ | |
OUTPUT Process(INPUT input); | |
} | |
/// <summary> | |
/// An extension method for combining PipelineSteps together into a data flow. | |
/// </summary> | |
public static class PipelineStepExtensions | |
{ | |
public static OUTPUT Step<INPUT, OUTPUT>(this INPUT input, IPipelineStep<INPUT, OUTPUT> step) | |
{ | |
return step.Process(input); | |
} | |
} | |
/// <summary> | |
/// The base type for a complete pipeline. | |
/// Descendant types can use their constructor to compile a set of PipelineSteps together | |
/// the PipelineStepExtensions.Step() method, and assign this to the PipelineSteps property here. | |
/// The initial and final types of the set of steps must match the input and output types of this class, | |
/// but the intermediate types can vary. | |
/// </summary> | |
public abstract class Pipeline<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> | |
{ | |
public Func<INPUT, OUTPUT> PipelineSteps { get; protected set; } | |
public OUTPUT Process(INPUT input) | |
{ | |
return PipelineSteps(input); | |
} | |
} | |
} |
namespace StronglyTypedPipelines | |
{ | |
public class BasicPipeline : Pipeline<int, float> | |
{ | |
public BasicPipeline() | |
{ | |
PipelineSteps = input => input | |
.Step(new DoubleStep()) | |
.Step(new ToStringStep()) | |
.Step(new DuplicateStep()) | |
.Step(new ToFloatStep()); | |
} | |
} | |
public class InnerPipeline : Pipeline<int, int> | |
{ | |
public InnerPipeline() | |
{ | |
PipelineSteps = input => input | |
.Step(new ThirdStep()) | |
.Step(new RoundStep()); | |
} | |
} | |
public class NestedPipeline : Pipeline<int, string> | |
{ | |
public NestedPipeline() | |
{ | |
PipelineSteps = input => input | |
.Step(new DoubleStep()) | |
.Step(new InnerPipeline()) | |
.Step(new ToStringStep()); | |
} | |
} | |
public class BranchingPipeline : Pipeline<int, string> | |
{ | |
public BranchingPipeline() | |
{ | |
PipelineSteps = input => input | |
.Step(new OptionalStep<int, int>( | |
f => f > 50, | |
new InnerPipeline() | |
)) | |
.Step(new ChoiceStep<int, int>( | |
f => f > 100, | |
new HalfStep(), | |
new DoubleStep() | |
)) | |
.Step(new ToStringStep()); | |
} | |
} | |
} |
namespace StronglyTypedPipelines | |
{ | |
public class DoubleStep : IPipelineStep<int, int> | |
{ | |
public int Process(int input) | |
{ | |
return input * 2; | |
} | |
} | |
public class HalfStep : IPipelineStep<int,int> | |
{ | |
public int Process(int input) | |
{ | |
return input / 2; | |
} | |
} | |
public class ThirdStep : IPipelineStep<int, float> | |
{ | |
public float Process(int input) | |
{ | |
return input / 3f; | |
} | |
} | |
public class RoundStep : IPipelineStep<float, int> | |
{ | |
public int Process(float input) | |
{ | |
return (int)input; | |
} | |
} | |
public class ToStringStep : IPipelineStep<int, string> | |
{ | |
public string Process(int input) | |
{ | |
return input.ToString(); | |
} | |
} | |
public class DuplicateStep : IPipelineStep<string, string> | |
{ | |
public string Process(string input) | |
{ | |
return input + "." + input; | |
} | |
} | |
public class ToFloatStep : IPipelineStep<string, float> | |
{ | |
public float Process(string input) | |
{ | |
return float.Parse(input); | |
} | |
} | |
} |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reflection; | |
using System.Xml.Linq; | |
namespace StronglyTypedPipelines | |
{ | |
public abstract class ConfigBasedPipeline<INPUT, OUTPUT> : Pipeline<INPUT, OUTPUT> | |
{ | |
public ConfigBasedPipeline(XElement pipelineXml) | |
{ | |
if (pipelineXml == null) | |
{ | |
throw new ArgumentNullException(nameof(pipelineXml)); | |
} | |
var pipelineSteps = parsePipelineSteps(pipelineXml); | |
validatePipelineSteps(pipelineSteps); | |
PipelineSteps = input => processPipelineSteps(pipelineSteps, input); | |
} | |
private OUTPUT processPipelineSteps(IList<IPipelineStep> pipelineSteps, INPUT input) | |
{ | |
object output = input; | |
foreach (IPipelineStep step in pipelineSteps) | |
{ | |
MethodInfo mi = step.GetType().GetMethod("Process", BindingFlags.Public | BindingFlags.Instance); | |
output = mi.Invoke(step, new[] { output }); | |
} | |
return (OUTPUT)output; | |
} | |
private IList<IPipelineStep> parsePipelineSteps(XElement pipelineXml) | |
{ | |
var pipeline = new List<IPipelineStep>(); | |
foreach (var xStep in pipelineXml.Elements("step")) | |
{ | |
string typeName = xStep.Attribute("type").Value; | |
var type = Type.GetType(typeName); | |
var ctr = type.GetConstructor(Type.EmptyTypes); | |
var obj = (IPipelineStep)ctr.Invoke(Type.EmptyTypes); | |
pipeline.Add(obj); | |
} | |
return pipeline; | |
} | |
private void validatePipelineSteps(IList<IPipelineStep> pipelineSteps) | |
{ | |
int stepNumber = 0; | |
Type pipelineBaseInterface = this.GetType().GetInterface("IPipelineStep`2"); | |
Type currentInputType = pipelineBaseInterface.GenericTypeArguments[0]; | |
Type outputType = pipelineBaseInterface.GenericTypeArguments[1]; | |
foreach (var step in pipelineSteps) | |
{ | |
stepNumber += 1; | |
Type stepBaseInterface = step.GetType().GetInterface("IPipelineStep`2"); | |
Type stepInType = stepBaseInterface.GenericTypeArguments[0]; | |
Type stepOutType = stepBaseInterface.GenericTypeArguments[1]; | |
if (currentInputType != stepInType) | |
{ | |
string msg = "Step #{0} {1} input type {2} does not match current type {3}."; | |
throw new InvalidOperationException(string.Format(msg, stepNumber, step.GetType().Name, stepInType.Name, currentInputType.Name)); | |
} | |
currentInputType = stepOutType; | |
} | |
if (currentInputType != outputType) | |
{ | |
string msg = "Final step #{0} {1} output type {2} does not equal output of pipeline {3}."; | |
throw new InvalidOperationException(string.Format(msg, stepNumber, pipelineSteps.Last().GetType().Name, currentInputType.Name, outputType.Name)); | |
} | |
} | |
} | |
public class ExampleConfigBasedPipeline : ConfigBasedPipeline<int, string> | |
{ | |
public ExampleConfigBasedPipeline(XElement pipelineXml) : base(pipelineXml) | |
{ | |
} | |
} | |
} |
<?xml version="1.0" encoding="utf-8" ?> | |
<pipeline name="example"> | |
<step type="StronglyTypedPipelines.DoubleStep, StronglyTypedPipelines" /> | |
<!-- Will error: <step type="StronglyTypedPipelines.ThirdStep, StronglyTypedPipelines"/> --> | |
<step type="StronglyTypedPipelines.ToStringStep, StronglyTypedPipelines" /> | |
<step type="StronglyTypedPipelines.DuplicateStep, StronglyTypedPipelines" /> | |
<!-- Will error: <step type="StronglyTypedPipelines.ToFloatStep, StronglyTypedPipelines"/> --> | |
</pipeline> |
using System; | |
namespace StronglyTypedPipelines | |
{ | |
public class OptionalStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT | |
{ | |
private IPipelineStep<INPUT, OUTPUT> _step; | |
private Func<INPUT, bool> _choice; | |
public OptionalStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> step) | |
{ | |
_choice = choice; | |
_step = step; | |
} | |
public OUTPUT Process(INPUT input) | |
{ | |
if (_choice(input)) | |
{ | |
return _step.Process(input); | |
} | |
else | |
{ | |
return input; | |
} | |
} | |
} | |
public class ChoiceStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT | |
{ | |
private IPipelineStep<INPUT, OUTPUT> _first; | |
private IPipelineStep<INPUT, OUTPUT> _second; | |
private Func<INPUT, bool> _choice; | |
public ChoiceStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> first, IPipelineStep<INPUT, OUTPUT> second) | |
{ | |
_choice = choice; | |
_first = first; | |
_second = second; | |
} | |
public OUTPUT Process(INPUT input) | |
{ | |
if (_choice(input)) | |
{ | |
return _first.Process(input); | |
} | |
else | |
{ | |
return _second.Process(input); | |
} | |
} | |
} | |
} |
using System; | |
using System.Collections.Generic; | |
using System.Linq.Expressions; | |
using System.Xml.Linq; | |
namespace StronglyTypedPipelines | |
{ | |
class Program | |
{ | |
private static void BasicPipelineTest() | |
{ | |
Console.WriteLine("Basic Pipeline Test"); | |
var input = 49; | |
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name)); | |
var pipeline = new BasicPipeline(); | |
var output = pipeline.Process(input); | |
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name)); | |
Console.WriteLine(); | |
} | |
private static void NestedPipelineTest() | |
{ | |
Console.WriteLine("Nested Pipeline Test"); | |
var input = 103; | |
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name)); | |
var pipeline = new NestedPipeline(); | |
var output = pipeline.Process(input); | |
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name)); | |
Console.WriteLine(); | |
} | |
private static void BranchingPipelineTest() | |
{ | |
Console.WriteLine("Branching Pipeline Test"); | |
foreach(int input in new int[] { 1, 10, 100, 1000 }) | |
{ | |
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name)); | |
var pipeline = new BranchingPipeline(); | |
var output = pipeline.Process(input); | |
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name)); | |
} | |
Console.WriteLine(); | |
} | |
private static void ModifiablePipelineTest() | |
{ | |
Console.WriteLine("Configured Pipeline Test"); | |
var input = 13; | |
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name)); | |
XDocument xd = XDocument.Load("ConfigBasedPipeline.xml"); | |
// | |
// Patching the configuration data would go here | |
// | |
XElement pipelineXml = xd.Document.Element("pipeline"); | |
try | |
{ | |
var pipeline = new ExampleConfigBasedPipeline(pipelineXml); | |
var output = pipeline.Process(input); | |
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name)); | |
} | |
catch(Exception ex) | |
{ | |
Console.WriteLine("ERROR: " + ex.Message); | |
} | |
Console.WriteLine(); | |
} | |
static void Main(string[] args) | |
{ | |
BasicPipelineTest(); | |
NestedPipelineTest(); | |
BranchingPipelineTest(); | |
ModifiablePipelineTest(); | |
} | |
} | |
} |
Hi Jeremy!
Sorry for the delay... I've not been feeling well the last few days (not Covid, though)
Yeah, you nailed it: it was not the order, but the async behaviour instead (wow, I'm really confused by all of this, lol...)
As I said, I'm new to C#/.NET so thanks again for your patience.
I did include your latest suggestion for AsyncPipelineStepExtensions and now works beautiful!
I think you really should include this Async approach to your gists... It helped me a lot, and can be of use for other devs too!
Amazing job, I really appreciate it! =D
gonna keep testing new scenarios, but I think most of the infrastructure is now solid enough to go on.
Thanks!!!! =)
Yeah totally agree the behaviour of async can be confusing. There are good books about this stuff though, if you want to learn a bit more - "C# In Depth" by Jon Skeet covers useful detail about the hows & whys of async. And it also covers lots of other helpful stuff for people new to the C# language overall. Maybe work will let you put a copy on expenses for learning?
Glad you're getting on top of all this though - happy have been able to help. Certainly not going to waste all this interesting material - what I've worked out will end up on my blog and I'll add new gists for the changed code as well.
Actually - having thought about this harder, I think there's a better solution that makes all this simpler. We can move all the logic for handling a
Task<TIn>
into the extension method class, and simplify everything else...Instead of the second batch of code from the previous comment...
and the individual example steps end up looking like:
That gives the same overall behaviour, but doesn't require an abstract base for pipeline steps, and doesn't suffer from the "putting code before the await for the input" issue I mentioned before.