Skip to content

Instantly share code, notes, and snippets.

@MrSmoke
Last active September 7, 2018 02:36
Show Gist options
  • Save MrSmoke/7e2de2cf990770190f18b4d8b6e98cd9 to your computer and use it in GitHub Desktop.
Save MrSmoke/7e2de2cf990770190f18b4d8b6e98cd9 to your computer and use it in GitHub Desktop.
Pipline Builder
public static void Demo()
{
var stringToInt = new TransformBlock<string, int>(str =>
{
Console.WriteLine("StringToInt");
return int.Parse(str);
});
var intToString = new TransformBlock<int, string>(i =>
{
Console.WriteLine("IntToString");
return i.ToString();
});
var endPrint = new ActionBlock<string>(s =>
{
Console.WriteLine("Finished: " + s);
});
PipelineBuilder.New(stringToInt)
.LinkTo(intToString)
.LinkTo(endPrint)
.Start("66666")
.Wait();
PipelineBuilder.New(stringToInt)
.LinkTo(intToString)
.End()
.Start("66666")
.Wait();
}
public static class PipelineBuilder
{
public static PiplineBuilderNode<TIn, TOut> New<TBlock, TIn, TOut>(TBlock block)
where TBlock : ISourceBlock<TOut>, ITargetBlock<TIn>
{
if (block == null)
throw new ArgumentNullException(nameof(block));
return new PiplineBuilderNode<TIn, TOut>(block, block, new DataflowLinkOptions
{
PropagateCompletion = true
});
}
public static PiplineBuilderNode<TIn, TOut> New<TIn, TOut>(IPropagatorBlock<TIn, TOut> block)
{
return New<IPropagatorBlock<TIn, TOut>, TIn, TOut>(block);
}
public class PipelineStart<T>
{
private readonly ITargetBlock<T> _startBlock;
private readonly IDataflowBlock _endBlock;
internal PipelineStart(ITargetBlock<T> startBlock, IDataflowBlock endBlock)
{
_startBlock = startBlock;
_endBlock = endBlock;
}
public Task Start(T data)
{
_startBlock.Post(data);
_startBlock.Complete();
return _endBlock.Completion;
}
}
public class PiplineBuilderNode<TStart, TIn>
{
private readonly ITargetBlock<TStart> _startBlock;
private readonly ISourceBlock<TIn> _lastBlock;
private readonly DataflowLinkOptions _linkOptions;
internal PiplineBuilderNode(ITargetBlock<TStart> startBlock, ISourceBlock<TIn> lastBlock,
DataflowLinkOptions linkOptions)
{
_startBlock = startBlock;
_lastBlock = lastBlock;
_linkOptions = linkOptions;
}
public PipelineStart<TStart> LinkTo(ITargetBlock<TIn> block)
{
if (block == null)
throw new ArgumentNullException(nameof(block));
_lastBlock.LinkTo(block, _linkOptions);
return new PipelineStart<TStart>(_startBlock, _lastBlock);
}
public PipelineStart<TStart> End()
{
return new PipelineStart<TStart>(_startBlock, _lastBlock);
}
public PiplineBuilderNode<TStart, TOut> LinkTo<TOut>(IPropagatorBlock<TIn, TOut> block)
{
if (block == null)
throw new ArgumentNullException(nameof(block));
_lastBlock.LinkTo(block, _linkOptions);
return new PiplineBuilderNode<TStart, TOut>(_startBlock, block, _linkOptions);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment