Skip to content

Instantly share code, notes, and snippets.

@jermdavis
Last active November 23, 2021 13:28
Show Gist options
  • Save jermdavis/49ecd692a16b10899eb2ee2b50770499 to your computer and use it in GitHub Desktop.
Save jermdavis/49ecd692a16b10899eb2ee2b50770499 to your computer and use it in GitHub Desktop.
An improved example for asynchronous pipeline code. See blog post for more info: https://blog.jermdavis.dev/posts/2021/a-second-pass-at-async-pipelines
async Task Main()
{
var pipeline = new ExampleAsyncPipeline();
var uri = new Uri("https://news.bbc.co.uk/");
var tempFile = await pipeline.ProcessAsync(uri);
Console.WriteLine($"{uri} saved to {tempFile}");
}
public interface IAsyncPipelineStep<TIn, TOut>
{
Task<TOut> ProcessAsync(TIn Input);
}
public abstract class AsyncPipeline<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
public Func<TIn, Task<TOut>> _pipelineSteps { get; protected set; }
public Task<TOut> ProcessAsync(TIn Input)
{
return _pipelineSteps(Input);
}
}
public static class AsyncPipelineStepExtensions
{
public async static Task<TOut> Step<TIn, TOut>(this Task<TIn> Input, IAsyncPipelineStep<TIn, TOut> Step)
{
Console.WriteLine($"Awaiting input for {Step.GetType().Name}");
var input = await Input;
Console.WriteLine($"Got input for {Step.GetType().Name}");
return await Step.ProcessAsync(input);
}
public async static Task<TOut> Step<TIn, TOut>(this TIn Input, IAsyncPipelineStep<TIn, TOut> Step)
{
Console.WriteLine($"Got input for {Step.GetType().Name}");
return await Step.ProcessAsync(Input);
}
}
public class HttpFetchAsyncStep : IAsyncPipelineStep<Uri, string>
{
private static readonly HttpClient _client = new HttpClient();
public async Task<string> ProcessAsync(Uri Input)
{
Console.WriteLine("Entering HttpFetchAsyncStep #1");
Console.WriteLine("Leaving HttpFetchAsyncStep #1");
return await _client.GetStringAsync(Input);
}
}
public class ModifyTextAsyncStep : IAsyncPipelineStep<string, string>
{
public async Task<string> ProcessAsync(string Input)
{
Console.WriteLine("Entering ModifyTextAsyncStep #2");
var output = Input.Replace("BBC", "Not the BBC");
Console.WriteLine("Leaving ModifyTextAsyncStep #2");
return output;
}
}
public class DiskWriteAsyncStep : IAsyncPipelineStep<string, string>
{
public async Task<string> ProcessAsync(string Input)
{
Console.WriteLine("Entering DiskWriteAsyncStep #3");
var desktopFolder = System.Environment.GetFolderPath(Environment.SpecialFolder.Desktop);
var fileName = System.IO.Path.Combine(desktopFolder, "test.txt");
await System.IO.File.WriteAllTextAsync(fileName, Input);
Console.WriteLine("Leaving DiskWriteAsyncStep #3");
return fileName;
}
}
public class ExampleAsyncPipeline : AsyncPipeline<Uri, string>
{
public ExampleAsyncPipeline()
{
_pipelineSteps = input => input
.Step(new HttpFetchAsyncStep())
.Step(new ModifyTextAsyncStep())
.Step(new DiskWriteAsyncStep());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment