-
-
Save jermdavis/4dcf6568d5cd44f03d0ec503620ac177 to your computer and use it in GitHub Desktop.
using System; | |
namespace StronglyTypedPipelines | |
{ | |
/// <summary> | |
/// A base interface required so that reflection code can create a Step from its type name, | |
/// without needing to understand its generic parameters first. | |
/// </summary> | |
public interface IPipelineStep | |
{ | |
} | |
/// <summary> | |
/// Base type for individual pipeline steps. | |
/// Descendants of this type map an input value to an output value. | |
/// The input and output types can differ. | |
/// </summary> | |
public interface IPipelineStep<INPUT, OUTPUT> : IPipelineStep | |
{ | |
OUTPUT Process(INPUT input); | |
} | |
/// <summary> | |
/// An extension method for combining PipelineSteps together into a data flow. | |
/// </summary> | |
public static class PipelineStepExtensions | |
{ | |
public static OUTPUT Step<INPUT, OUTPUT>(this INPUT input, IPipelineStep<INPUT, OUTPUT> step) | |
{ | |
return step.Process(input); | |
} | |
} | |
/// <summary> | |
/// The base type for a complete pipeline. | |
/// Descendant types can use their constructor to compile a set of PipelineSteps together | |
/// the PipelineStepExtensions.Step() method, and assign this to the PipelineSteps property here. | |
/// The initial and final types of the set of steps must match the input and output types of this class, | |
/// but the intermediate types can vary. | |
/// </summary> | |
public abstract class Pipeline<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> | |
{ | |
public Func<INPUT, OUTPUT> PipelineSteps { get; protected set; } | |
public OUTPUT Process(INPUT input) | |
{ | |
return PipelineSteps(input); | |
} | |
} | |
} |
namespace StronglyTypedPipelines | |
{ | |
public class BasicPipeline : Pipeline<int, float> | |
{ | |
public BasicPipeline() | |
{ | |
PipelineSteps = input => input | |
.Step(new DoubleStep()) | |
.Step(new ToStringStep()) | |
.Step(new DuplicateStep()) | |
.Step(new ToFloatStep()); | |
} | |
} | |
public class InnerPipeline : Pipeline<int, int> | |
{ | |
public InnerPipeline() | |
{ | |
PipelineSteps = input => input | |
.Step(new ThirdStep()) | |
.Step(new RoundStep()); | |
} | |
} | |
public class NestedPipeline : Pipeline<int, string> | |
{ | |
public NestedPipeline() | |
{ | |
PipelineSteps = input => input | |
.Step(new DoubleStep()) | |
.Step(new InnerPipeline()) | |
.Step(new ToStringStep()); | |
} | |
} | |
public class BranchingPipeline : Pipeline<int, string> | |
{ | |
public BranchingPipeline() | |
{ | |
PipelineSteps = input => input | |
.Step(new OptionalStep<int, int>( | |
f => f > 50, | |
new InnerPipeline() | |
)) | |
.Step(new ChoiceStep<int, int>( | |
f => f > 100, | |
new HalfStep(), | |
new DoubleStep() | |
)) | |
.Step(new ToStringStep()); | |
} | |
} | |
} |
namespace StronglyTypedPipelines | |
{ | |
public class DoubleStep : IPipelineStep<int, int> | |
{ | |
public int Process(int input) | |
{ | |
return input * 2; | |
} | |
} | |
public class HalfStep : IPipelineStep<int,int> | |
{ | |
public int Process(int input) | |
{ | |
return input / 2; | |
} | |
} | |
public class ThirdStep : IPipelineStep<int, float> | |
{ | |
public float Process(int input) | |
{ | |
return input / 3f; | |
} | |
} | |
public class RoundStep : IPipelineStep<float, int> | |
{ | |
public int Process(float input) | |
{ | |
return (int)input; | |
} | |
} | |
public class ToStringStep : IPipelineStep<int, string> | |
{ | |
public string Process(int input) | |
{ | |
return input.ToString(); | |
} | |
} | |
public class DuplicateStep : IPipelineStep<string, string> | |
{ | |
public string Process(string input) | |
{ | |
return input + "." + input; | |
} | |
} | |
public class ToFloatStep : IPipelineStep<string, float> | |
{ | |
public float Process(string input) | |
{ | |
return float.Parse(input); | |
} | |
} | |
} |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reflection; | |
using System.Xml.Linq; | |
namespace StronglyTypedPipelines | |
{ | |
public abstract class ConfigBasedPipeline<INPUT, OUTPUT> : Pipeline<INPUT, OUTPUT> | |
{ | |
public ConfigBasedPipeline(XElement pipelineXml) | |
{ | |
if (pipelineXml == null) | |
{ | |
throw new ArgumentNullException(nameof(pipelineXml)); | |
} | |
var pipelineSteps = parsePipelineSteps(pipelineXml); | |
validatePipelineSteps(pipelineSteps); | |
PipelineSteps = input => processPipelineSteps(pipelineSteps, input); | |
} | |
private OUTPUT processPipelineSteps(IList<IPipelineStep> pipelineSteps, INPUT input) | |
{ | |
object output = input; | |
foreach (IPipelineStep step in pipelineSteps) | |
{ | |
MethodInfo mi = step.GetType().GetMethod("Process", BindingFlags.Public | BindingFlags.Instance); | |
output = mi.Invoke(step, new[] { output }); | |
} | |
return (OUTPUT)output; | |
} | |
private IList<IPipelineStep> parsePipelineSteps(XElement pipelineXml) | |
{ | |
var pipeline = new List<IPipelineStep>(); | |
foreach (var xStep in pipelineXml.Elements("step")) | |
{ | |
string typeName = xStep.Attribute("type").Value; | |
var type = Type.GetType(typeName); | |
var ctr = type.GetConstructor(Type.EmptyTypes); | |
var obj = (IPipelineStep)ctr.Invoke(Type.EmptyTypes); | |
pipeline.Add(obj); | |
} | |
return pipeline; | |
} | |
private void validatePipelineSteps(IList<IPipelineStep> pipelineSteps) | |
{ | |
int stepNumber = 0; | |
Type pipelineBaseInterface = this.GetType().GetInterface("IPipelineStep`2"); | |
Type currentInputType = pipelineBaseInterface.GenericTypeArguments[0]; | |
Type outputType = pipelineBaseInterface.GenericTypeArguments[1]; | |
foreach (var step in pipelineSteps) | |
{ | |
stepNumber += 1; | |
Type stepBaseInterface = step.GetType().GetInterface("IPipelineStep`2"); | |
Type stepInType = stepBaseInterface.GenericTypeArguments[0]; | |
Type stepOutType = stepBaseInterface.GenericTypeArguments[1]; | |
if (currentInputType != stepInType) | |
{ | |
string msg = "Step #{0} {1} input type {2} does not match current type {3}."; | |
throw new InvalidOperationException(string.Format(msg, stepNumber, step.GetType().Name, stepInType.Name, currentInputType.Name)); | |
} | |
currentInputType = stepOutType; | |
} | |
if (currentInputType != outputType) | |
{ | |
string msg = "Final step #{0} {1} output type {2} does not equal output of pipeline {3}."; | |
throw new InvalidOperationException(string.Format(msg, stepNumber, pipelineSteps.Last().GetType().Name, currentInputType.Name, outputType.Name)); | |
} | |
} | |
} | |
public class ExampleConfigBasedPipeline : ConfigBasedPipeline<int, string> | |
{ | |
public ExampleConfigBasedPipeline(XElement pipelineXml) : base(pipelineXml) | |
{ | |
} | |
} | |
} |
<?xml version="1.0" encoding="utf-8" ?> | |
<pipeline name="example"> | |
<step type="StronglyTypedPipelines.DoubleStep, StronglyTypedPipelines" /> | |
<!-- Will error: <step type="StronglyTypedPipelines.ThirdStep, StronglyTypedPipelines"/> --> | |
<step type="StronglyTypedPipelines.ToStringStep, StronglyTypedPipelines" /> | |
<step type="StronglyTypedPipelines.DuplicateStep, StronglyTypedPipelines" /> | |
<!-- Will error: <step type="StronglyTypedPipelines.ToFloatStep, StronglyTypedPipelines"/> --> | |
</pipeline> |
using System; | |
namespace StronglyTypedPipelines | |
{ | |
public class OptionalStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT | |
{ | |
private IPipelineStep<INPUT, OUTPUT> _step; | |
private Func<INPUT, bool> _choice; | |
public OptionalStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> step) | |
{ | |
_choice = choice; | |
_step = step; | |
} | |
public OUTPUT Process(INPUT input) | |
{ | |
if (_choice(input)) | |
{ | |
return _step.Process(input); | |
} | |
else | |
{ | |
return input; | |
} | |
} | |
} | |
public class ChoiceStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT | |
{ | |
private IPipelineStep<INPUT, OUTPUT> _first; | |
private IPipelineStep<INPUT, OUTPUT> _second; | |
private Func<INPUT, bool> _choice; | |
public ChoiceStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> first, IPipelineStep<INPUT, OUTPUT> second) | |
{ | |
_choice = choice; | |
_first = first; | |
_second = second; | |
} | |
public OUTPUT Process(INPUT input) | |
{ | |
if (_choice(input)) | |
{ | |
return _first.Process(input); | |
} | |
else | |
{ | |
return _second.Process(input); | |
} | |
} | |
} | |
} |
using System; | |
using System.Collections.Generic; | |
using System.Linq.Expressions; | |
using System.Xml.Linq; | |
namespace StronglyTypedPipelines | |
{ | |
class Program | |
{ | |
private static void BasicPipelineTest() | |
{ | |
Console.WriteLine("Basic Pipeline Test"); | |
var input = 49; | |
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name)); | |
var pipeline = new BasicPipeline(); | |
var output = pipeline.Process(input); | |
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name)); | |
Console.WriteLine(); | |
} | |
private static void NestedPipelineTest() | |
{ | |
Console.WriteLine("Nested Pipeline Test"); | |
var input = 103; | |
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name)); | |
var pipeline = new NestedPipeline(); | |
var output = pipeline.Process(input); | |
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name)); | |
Console.WriteLine(); | |
} | |
private static void BranchingPipelineTest() | |
{ | |
Console.WriteLine("Branching Pipeline Test"); | |
foreach(int input in new int[] { 1, 10, 100, 1000 }) | |
{ | |
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name)); | |
var pipeline = new BranchingPipeline(); | |
var output = pipeline.Process(input); | |
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name)); | |
} | |
Console.WriteLine(); | |
} | |
private static void ModifiablePipelineTest() | |
{ | |
Console.WriteLine("Configured Pipeline Test"); | |
var input = 13; | |
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name)); | |
XDocument xd = XDocument.Load("ConfigBasedPipeline.xml"); | |
// | |
// Patching the configuration data would go here | |
// | |
XElement pipelineXml = xd.Document.Element("pipeline"); | |
try | |
{ | |
var pipeline = new ExampleConfigBasedPipeline(pipelineXml); | |
var output = pipeline.Process(input); | |
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name)); | |
} | |
catch(Exception ex) | |
{ | |
Console.WriteLine("ERROR: " + ex.Message); | |
} | |
Console.WriteLine(); | |
} | |
static void Main(string[] args) | |
{ | |
BasicPipelineTest(); | |
NestedPipelineTest(); | |
BranchingPipelineTest(); | |
ModifiablePipelineTest(); | |
} | |
} | |
} |
Actually - having thought about this harder, I think there's a better solution that makes all this simpler. We can move all the logic for handling a Task<TIn>
into the extension method class, and simplify everything else...
Instead of the second batch of code from the previous comment...
public interface IAsyncPipelineStep<TIn, TOut>
{
Task<TOut> ProcessAsync(TIn Input);
}
public abstract class AsyncPipeline<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
public Func<TIn, Task<TOut>> _pipelineSteps { get; protected set; }
public Task<TOut> ProcessAsync(TIn Input)
{
return _pipelineSteps(Input);
}
}
public static class AsyncPipelineStepExtensions
{
public async static Task<TOut> Step<TIn, TOut>(this Task<TIn> Input, IAsyncPipelineStep<TIn, TOut> Step)
{
Console.WriteLine($"Awaiting input for {Step.GetType().Name}");
var input = await Input;
Console.WriteLine($"Got input for {Step.GetType().Name}");
return await Step.ProcessAsync(input);
}
public async static Task<TOut> Step<TIn, TOut>(this TIn Input, IAsyncPipelineStep<TIn, TOut> Step)
{
Console.WriteLine($"Got input for {Step.GetType().Name}");
return await Step.ProcessAsync(Input);
}
}
and the individual example steps end up looking like:
public class HttpFetchAsyncStep : IAsyncPipelineStep<Uri, string>
{
private static readonly HttpClient _client = new HttpClient();
public async Task<string> ProcessAsync(Uri Input)
{
Console.WriteLine("Entering HttpFetchAsyncStep #1");
Console.WriteLine("Leaving HttpFetchAsyncStep #1");
return await _client.GetStringAsync(Input);
}
}
public class ModifyTextAsyncStep : IAsyncPipelineStep<string, string>
{
public async Task<string> ProcessAsync(string Input)
{
Console.WriteLine("Entering ModifyTextAsyncStep #2");
var output = Input.Replace("BBC", "Not the BBC");
Console.WriteLine("Leaving ModifyTextAsyncStep #2");
return output;
}
}
public class DiskWriteAsyncStep : IAsyncPipelineStep<string, string>
{
public async Task<string> ProcessAsync(string Input)
{
Console.WriteLine("Entering DiskWriteAsyncStep #3");
var desktopFolder = System.Environment.GetFolderPath(Environment.SpecialFolder.Desktop);
var fileName = System.IO.Path.Combine(desktopFolder, "test.txt");
await System.IO.File.WriteAllTextAsync(fileName, Input);
Console.WriteLine("Leaving DiskWriteAsyncStep #3");
return fileName;
}
}
That gives the same overall behaviour, but doesn't require an abstract base for pipeline steps, and doesn't suffer from the "putting code before the await for the input" issue I mentioned before.
Hi Jeremy!
Sorry for the delay... I've not been feeling well the last few days (not Covid, though)
Yeah, you nailed it: it was not the order, but the async behaviour instead (wow, I'm really confused by all of this, lol...)
As I said, I'm new to C#/.NET so thanks again for your patience.
I did include your latest suggestion for AsyncPipelineStepExtensions and now works beautiful!
I think you really should include this Async approach to your gists... It helped me a lot, and can be of use for other devs too!
Amazing job, I really appreciate it! =D
gonna keep testing new scenarios, but I think most of the infrastructure is now solid enough to go on.
Thanks!!!! =)
Yeah totally agree the behaviour of async can be confusing. There are good books about this stuff though, if you want to learn a bit more - "C# In Depth" by Jon Skeet covers useful detail about the hows & whys of async. And it also covers lots of other helpful stuff for people new to the C# language overall. Maybe work will let you put a copy on expenses for learning?
Glad you're getting on top of all this though - happy have been able to help. Certainly not going to waste all this interesting material - what I've worked out will end up on my blog and I'll add new gists for the changed code as well.
Ok, so two things:
Firstly, making the
AsyncPipelineStepExtensions
async: I'm not sure you need that? Given the step interface still has task-in-task-out, I think you just end up awaiting the same task twice there? Once in the extension method, and once in the next step? I don't think that matters because the firstawait
will resolve theTask
's value, and the second will then effectively do nothing because theTask
already has its value. But I think the code works without it? Methods with a return type ofTask<T>
but noasync
modifier just return the task - they don't do any complicated compiler business to implement asynchronous behaviour. And in this case I think that's fine?Secondly, ordering: Are the steps really happening out-of-order, or are you seeing the effects of how the whole
async
thing works? The internals of what the compiler does withasync
code is pretty complex, but it boils down to "while anawait
statement is waiting for the future-value of aTask
, lets try to run some other code if we can". Internally this is done with complicated state machines, that allow you to write linear-looking code, while behind the scenes the compiler generates wrappers that allow that flow of execution to work despite there being the old pattern of "BeginSomething()" / "EndSomething()" asyncronicity under the hood?(BTW I don't consider myself too knowledgeable on how this stuff works - so it's possible I'm misunderstanding stuff here - but this is how I think it works...)
And this has turned into a bit of an epic answer - sorry ;-)
Anyway, I think we can see the outline of that behaviour in a simple example. If I create three steps which look like this:
The first step does an HTTP fetch for whatever URL comes in, and it outputs some text at three key points: On entry, after the input await and before it returns.
The second step does some text replacement, and outputs similar messages.
And then step three writes the changed data to disk, and does another set of
WriteLine
s in the same pattern.I don't think the particular bits of code are important - I just wanted something simple-ish to demonstrate the behaviour. I suspect you'd see something similar in your code...
Now, if I make a pipeline from those:
It will run a pipeline with the steps in that order, but the output looks like:
So you can see the steps don't happen completely linearly:
await
for step two's input task completes and provides its value. That allows step two to complete the rest of its processing and return its output.await
to complete and hence it can do the rest of its processing.So I think that's why you see aparently out-of-order execution? The runtime behind the
async
/await
calls is making sure that where necessary, the flow of your program is preserved, but it trys to fill the time during the "waiting for a future value" bits of the long-running operations with other bits of work where it can?Now thinking about it, I suspect this allows for some odd behaviour in the code here if anything "important" happens before awaiting the input in an individual step. (The delay between that code, and the
await
completing could lead to deadlocks or other strangeness in some code) That makes me want to abstract that await out of the step itself and hide it away in some base code. but that's trickier to achieve - as it needs more change. Quick attempt...First, the pipeline definition needs to allow explicitly for both
Task<TIn>
andTIn
:We'd already added that signature to
AsyncPipeline<TIn, TOut>
- so that doesn't change. And we can leaveAsyncPipelineStepExtensions
too, as it already has all its required methods.To hide that input
await
we can have an abstract base for each pipeline step:Than handles the
Task<TIn>
method by awaiting the input. And it delegaes theTIn
method to the concrete class. So we can tweak the individual step definitions from my example above:They override that abstract
TIn
method bnow, and never have toawait
their input.If I run that, the output becomes
Which makes it look all nice and synchronous. But actually all we did was hide the ordering oddity from above. If I add some extra debug data to the abstract step type:
Then you can see that the ordering still has the whole "
await
will wander off and do other stuff" behaviour:Difference here being that it's harder for someone writing a concrete step to accidentally mess up because of that...
Does that make sense?