Created
July 18, 2022 16:48
-
-
Save to11mtm/9b4402656af4b60f9cbcc8fac4b9b347 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| void Main() | |
| { | |
| } | |
| public static class ChannelOpDsl | |
| { | |
| public static ChannelCommandQueue<TCommand> BeginCommandQueue<TCommand>(int bufferSize, bool multiWrite) | |
| { | |
| return new ChannelCommandQueue<TCommand>(bufferSize,multiWrite); | |
| } | |
| public static ChannelReader<TOut> SelectAsync<TIn,TOut>(this ChannelReader<TIn> reader, Func<TIn, CancellationToken, Task<TOut>> func, int parallelism = 1) | |
| { | |
| return new TransformingChannelReader<TIn,TOut>(reader,func, parallelism); | |
| } | |
| public static ChannelReader<TElement> Where<TElement>(this ChannelReader<TElement> reader, Func<TElement,bool> shouldReturn) | |
| { | |
| return new FilteringCompletingChannelReader<TElement>(reader, el=> (shouldReturn(el),false)); | |
| } | |
| public static ChannelReader<TElement> FilterUntil<TElement>(this ChannelReader<TElement> reader, Func<TElement,bool> isDone) | |
| { | |
| return new SyncUberSelector<TElement,TElement>(reader,el=>{ | |
| return(isDone(el)? SelectState.IsDone: SelectState.ShouldReturn, el) | |
| }); | |
| } | |
| public static ChannelReader<TElement> WhereUber<TElement>(this ChannelReader<TElement> reader, Func<TElement, bool> predicate) | |
| { | |
| return new SyncUberSelector<TElement, TElement>(reader, el => | |
| { | |
| return (predicate(el)? SelectState.ShouldReturn: SelectState.ShouldSkip,el); | |
| }); | |
| } | |
| public static ChannelReader<TAgg> Batched<TIn,TAgg>( | |
| this ChannelReader<TIn> reader, | |
| int maxItems, | |
| Func<TIn,int> cost, | |
| Func<TIn,TAgg> seed, | |
| Func<TIn,TAgg,TAgg> aggregate) | |
| { | |
| return new BatchingChannelReader<TIn,TAgg>(reader,maxItems,cost,seed,aggregate); | |
| } | |
| } | |
| public class WatChannelWriter<TOut> : ChannelWriter<TOut> | |
| { | |
| public override bool TryWrite(TOut item) | |
| { | |
| throw new NotImplementedException(); | |
| } | |
| public override ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken = default) | |
| { | |
| throw new NotImplementedException(); | |
| } | |
| } | |
| public class StatefulTransformAsync<TIn,TOut,TState> : ChannelReader<TOut> | |
| { | |
| ChannelReader<TIn> _reader; | |
| Func<Task<TState>> _startState; | |
| Func<(TIn input, TState inState), | |
| Task<(TOut output, TState state, bool complete)>> _transform; | |
| LinkedTaskCompletionSource _completion; | |
| public override Task Completion => _completion.GetCompletion(); | |
| public StatefulTransformAsync(ChannelReader<TIn> reader, | |
| Func<Task<TState>> startState, | |
| Func<(TIn input,TState inState), | |
| Task<(TOut output,TState state, bool complete)>> transform) | |
| { | |
| _completion = new LinkedTaskCompletionSource(reader.Completion); | |
| _reader = reader; | |
| _startState = startState; | |
| _transform = transform; | |
| } | |
| private bool isClosed = false; | |
| private Task<(TOut output,TState state, bool complete)> outputItem; | |
| private TState currentState; | |
| public override bool TryRead([MaybeNullWhen(false)] out TOut item) | |
| { | |
| if (outputItem == null) | |
| { | |
| TryStartOutputItemFast(); | |
| } | |
| if (isClosed) | |
| { | |
| } | |
| else if (outputItem != null && outputItem.IsCompleted) | |
| { | |
| var newStuff = outputItem.Result; | |
| currentState = newStuff.state; | |
| item = newStuff.output; | |
| if (newStuff.complete) | |
| { | |
| isClosed = true; | |
| } | |
| return true; | |
| } | |
| item = default; | |
| return false; | |
| } | |
| public void TryStartOutputItemFast() | |
| { | |
| if (_reader.TryRead(out var nextItem)) | |
| { | |
| outputItem = _transform((nextItem,currentState)); | |
| } | |
| } | |
| public override async ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default) | |
| { | |
| if (isClosed) | |
| { | |
| return false; | |
| } | |
| if (outputItem != null) | |
| { | |
| await outputItem; | |
| return true; | |
| } | |
| if (await _reader.WaitToReadAsync(cancellationToken) == false) | |
| { | |
| return false; | |
| } | |
| TryStartOutputItemFast(); | |
| if (outputItem.IsCompleted) | |
| { | |
| return outputItem.Result.complete == false; | |
| } | |
| else | |
| { | |
| return (await outputItem).complete == false; | |
| } | |
| } | |
| } | |
| public class ChannelCommandQueue<TCommand> | |
| { | |
| private readonly Channel<TCommand> _channel; | |
| public ChannelCommandQueue(int maxBuffer, bool multiWrite) | |
| { | |
| _channel = Channel.CreateBounded<TCommand>(new BoundedChannelOptions(maxBuffer){ FullMode = BoundedChannelFullMode.Wait, SingleWriter = !multiWrite, SingleReader = true} | |
| } | |
| public ChannelReader<TCommand> Reader => _channel.Reader; | |
| public bool TryWrite(TCommand command) | |
| { | |
| if (_channel.Writer.TryWrite(command)) | |
| { | |
| return true; | |
| } | |
| return false; | |
| } | |
| public bool TryComplete() | |
| { | |
| return _channel.Writer.TryComplete(); | |
| } | |
| public bool TryFail(Exception ex) | |
| { | |
| return _channel.Writer.TryComplete(ex); | |
| } | |
| public async ValueTask WriteAsync(TCommand command, CancellationToken token= default) | |
| { | |
| await _channel.Writer.WriteAsync(command, token); | |
| } | |
| public async ValueTask<bool> WaitForWriteOpenAsync(CancellationToken token = default) | |
| { | |
| return await _channel.Writer.WaitToWriteAsync(token); | |
| } | |
| } | |
| public class ChannelReaderAction<TIn> | |
| { | |
| private readonly ChannelReader<TIn> _reader | |
| private readonly Func<TIn,Task> _foreachAction; | |
| public ChannelReaderAction(ChannelReader<TIn> reader, Func<TIn,Task> foreachAction) | |
| { | |
| _reader = reader; | |
| _foreachAction = foreachAction; | |
| } | |
| public async Task ExecuteAsync(CancellationToken token) | |
| { | |
| while (await _reader.WaitToReadAsync(token)) | |
| { | |
| _reader.TryRead(out var toDo); | |
| await _foreachAction(toDo); | |
| } | |
| } | |
| } | |
| public class LinkedTaskCompletionSource | |
| { | |
| private TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); | |
| private Task _parent; | |
| public LinkedTaskCompletionSource(Task parent) | |
| { | |
| _parent = parent; | |
| } | |
| public async Task GetCompletion() | |
| { | |
| await _parent; | |
| await tcs.Task; | |
| } | |
| public void TrySetComplete() | |
| { | |
| tcs.TrySetResult(); | |
| } | |
| public void TrySetFailed(Exception ex) | |
| { | |
| tcs.TrySetException(ex); | |
| } | |
| } | |
| public class FilteringCompletingChannelReader<TElement> : ChannelReader<TElement> | |
| { | |
| public override Task Completion => base.Completion; | |
| private readonly ChannelReader<TElement> _reader; | |
| private readonly Func<TElement,(bool shouldReturn, bool isDone)> _filterFunc; | |
| private bool hasBuffered; | |
| private TElement element; | |
| private bool isClosed = false; | |
| public FilteringCompletingChannelReader(ChannelReader<TElement> reader, Func<TElement,(bool shouldReturn,bool isDone)> filterFunc) | |
| { | |
| _reader = reader; | |
| _filterFunc = filterFunc; | |
| } | |
| public override bool TryRead([MaybeNullWhen(false)] out TElement item) | |
| { | |
| if (isClosed) | |
| { | |
| } | |
| else if (hasBuffered) | |
| { | |
| hasBuffered = false; | |
| item = element; | |
| return true; | |
| } | |
| else | |
| { | |
| bool foundVal = false; | |
| while (foundVal == false && _reader.Completion.IsCompletedSuccessfully==false) | |
| { | |
| if (_reader.TryRead(out var next) == false) | |
| { | |
| item = default; | |
| return false; | |
| } | |
| var (sr, id) = _filterFunc(next); | |
| if (sr) | |
| { | |
| item = next; | |
| return true; | |
| } | |
| if (id) | |
| { | |
| isClosed = true; | |
| } | |
| } | |
| } | |
| item = default; | |
| return false; | |
| } | |
| public override async ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default) | |
| { | |
| bool foundVal = false; | |
| while (foundVal == false) | |
| { | |
| if (_reader.TryRead(out var nextItem) == false) | |
| { | |
| //weird, race? | |
| //We fall through to wait again | |
| } | |
| else | |
| { | |
| var (sr, id) = _filterFunc(nextItem); | |
| if (sr) | |
| { | |
| element = nextItem; | |
| hasBuffered = true; | |
| return true; | |
| } | |
| else if (id) | |
| { | |
| return false; | |
| } | |
| } | |
| if (await _reader.WaitToReadAsync(cancellationToken) == false || _reader.Completion.IsCompletedSuccessfully) | |
| { | |
| return false; | |
| } | |
| } | |
| return true; | |
| } | |
| } | |
| public class ChannelSpreader<TElement> | |
| { | |
| ChannelReader<TElement> _reader; | |
| Channel<TElement>[] _fanOutChannels; | |
| public ChannelSpreader(ChannelReader<TElement> reader, int maxOut) | |
| { | |
| _reader = reader; | |
| _fanOutChannels = Enumerable.Repeat(Channel.CreateBounded<TElement>(new BoundedChannelOptions(1) { SingleWriter = false, SingleReader = false }), maxOut).ToArray(); | |
| Task.Run(async () => { | |
| var reader = _reader.ReadAllAsync(); | |
| try | |
| { | |
| await foreach (var element in reader) | |
| { | |
| bool tryWrite() | |
| { | |
| bool wroteOne = false; | |
| foreach (var output in _fanOutChannels) | |
| { | |
| if (output.Writer.TryWrite(element)) | |
| { | |
| wroteOne = true; | |
| break; | |
| } | |
| } | |
| return wroteOne; | |
| }; | |
| while (tryWrite() == false) | |
| { | |
| await Task.WhenAny(_fanOutChannels.Select(r => r.Writer.WaitToWriteAsync().AsTask())); | |
| } | |
| } | |
| } | |
| catch (Exception ex) | |
| { | |
| foreach (var ch in _fanOutChannels) | |
| { | |
| ch.Writer.TryComplete(ex); | |
| } | |
| } | |
| foreach (var ch in _fanOutChannels) | |
| { | |
| ch.Writer.TryComplete(); | |
| } | |
| }); | |
| _reader.read | |
| } | |
| } | |
| public class TransformingChannelReader<TIn, TOut> : ChannelReader<TOut> | |
| { | |
| private readonly ChannelReader<TIn> _reader; | |
| private readonly Func<TIn, CancellationToken, Task<TOut>> _transform; | |
| private readonly int _maxParallelism; | |
| private ConcurrentQueue<Task<TOut>> waitingOuts; | |
| private int currentOuts; | |
| private int isPullingFirst = 0; | |
| private readonly object _lockObj = new Object(); | |
| private int pullingCheck = 0; | |
| public TransformingChannelReader(ChannelReader<TIn> reader,Func<TIn,CancellationToken,Task<TOut>> transform, int maxParallelism) | |
| { | |
| _reader = reader; | |
| _transform = transform; | |
| _maxParallelism = maxParallelism; | |
| waitingOuts = new ConcurrentQueue<System.Threading.Tasks.Task<TOut>>(); | |
| } | |
| public override bool TryRead([MaybeNullWhen(false)] out TOut item) | |
| { | |
| EnsureQueueState(); | |
| doPeek: | |
| if (waitingOuts.TryPeek(out var hasVal)) | |
| { | |
| if (hasVal.IsCompleted) | |
| { | |
| if (waitingOuts.TryDequeue(out hasVal) && hasVal.IsCompleted) | |
| { | |
| Interlocked.Decrement(ref currentOuts); | |
| item = hasVal.Result; | |
| return true; | |
| } | |
| else | |
| { | |
| goto doPeek; | |
| } | |
| } | |
| } | |
| item = default; | |
| return false; | |
| } | |
| private void EnsureQueueState() | |
| { | |
| //OK so this is kinda sloppy, | |
| //And may cause some 'drift' under multiple reader scenarios. | |
| if (Interlocked.CompareExchange(ref pullingCheck, 1, 0) == 0) | |
| { | |
| while (Volatile.Read(ref currentOuts) <= _maxParallelism && _reader.TryRead(out var next)) | |
| { | |
| var ourNum = Interlocked.Increment(ref currentOuts); | |
| if (ourNum <= _maxParallelism) | |
| { | |
| waitingOuts.Enqueue(_transform(next, default)); | |
| } | |
| } | |
| Volatile.Write(ref pullingCheck, 0); | |
| } | |
| } | |
| public override async ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default) | |
| { | |
| EnsureQueueState(); | |
| Task<TOut> t = null; | |
| while (currentOuts == 0 && (waitingOuts.TryPeek(out t) == false)) | |
| { | |
| var readWait = await _reader.WaitToReadAsync(cancellationToken); | |
| if (readWait == false) | |
| { | |
| return false; | |
| } | |
| EnsureQueueState(); | |
| } | |
| await t; | |
| return true; | |
| } | |
| } | |
| public class FlattenChannelReader<TElement> : ChannelReader<TElement> | |
| { | |
| public readonly ChannelReader<IEnumerable<TElement>> _reader; | |
| private IEnumerator<TElement> current; | |
| private bool hasBuffered; | |
| public FlattenChannelReader(ChannelReader<IEnumerable<TElement>> reader) | |
| { | |
| _reader = reader; | |
| } | |
| public override bool TryRead([MaybeNullWhen(false)] out TElement item) | |
| { | |
| if (hasBuffered) | |
| { | |
| hasBuffered = false; | |
| item = current.Current; | |
| return true; | |
| } | |
| else if (current == null || current.MoveNext() == false) | |
| { | |
| current?.Dispose(); | |
| while (_reader.TryRead(out var eInst)) | |
| { | |
| var enumerator = eInst.GetEnumerator(); | |
| if (enumerator.MoveNext()) | |
| { | |
| current = enumerator; | |
| item = enumerator.Current; | |
| return true; | |
| } | |
| else | |
| { | |
| enumerator.Dispose(); | |
| } | |
| } | |
| item = default; | |
| return false; | |
| } | |
| item = current.Current; | |
| return true; | |
| } | |
| public override async ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default) | |
| { | |
| if (current == null || current.MoveNext() == false) | |
| { | |
| bool foundNonEmpty = false; | |
| while (foundNonEmpty == false) | |
| { | |
| var hasNext = await _reader.WaitToReadAsync(cancellationToken); | |
| if (hasNext == false) | |
| { | |
| return false; | |
| } | |
| while (_reader.TryRead(out var eInst)) | |
| { | |
| var enumerator = eInst.GetEnumerator(); | |
| if (enumerator.MoveNext()) | |
| { | |
| current = enumerator; | |
| hasBuffered = true; | |
| foundNonEmpty = true; | |
| } | |
| else | |
| { | |
| enumerator.Dispose(); | |
| } | |
| } | |
| } | |
| return true; | |
| } | |
| else | |
| { | |
| hasBuffered = true; | |
| return true; | |
| } | |
| } | |
| } | |
| public enum SelectState | |
| { | |
| ShouldReturn, | |
| IsDone, | |
| ShouldSkip, | |
| } | |
| public class SyncUberSelector<TIn, TOut> : ChannelReader<TOut> | |
| { | |
| private readonly ChannelReader<TIn> _reader; | |
| Func<TIn, (SelectState state, TOut next)> _selector; | |
| public SyncUberSelector(ChannelReader<TIn> reader, Func<TIn, (SelectState state, TOut next)> selector) | |
| { | |
| _reader = reader; | |
| _selector = selector; | |
| } | |
| private bool hasBufferedItem = false; | |
| private TOut bufferedItem; | |
| public override bool TryRead([MaybeNullWhen(false)] out TOut item) | |
| { | |
| while (_reader.TryRead(out var nextItem)) | |
| { | |
| var res = _selector(nextItem); | |
| if (res.state == SelectState.ShouldReturn) | |
| { | |
| item = res.next; | |
| return true; | |
| } | |
| else if (res.state == SelectState.IsDone) | |
| { | |
| break; | |
| } | |
| } | |
| item = default; | |
| return false; | |
| } | |
| public override async ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default) | |
| { | |
| if (hasBufferedItem) | |
| { | |
| return true; | |
| } | |
| else | |
| { | |
| bool isDone = false; | |
| while (await _reader.WaitToReadAsync(cancellationToken)) | |
| { | |
| if (_reader.TryRead(out var nextItem)) | |
| { | |
| var res = _selector(nextItem); | |
| if (res.state == SelectState.ShouldReturn) | |
| { | |
| hasBufferedItem = true; | |
| bufferedItem = res.next; | |
| return true; | |
| } | |
| else if (res.state == SelectState.IsDone) | |
| { | |
| return false; | |
| } | |
| } | |
| } | |
| return false; | |
| } | |
| } | |
| } | |
| public class BatchingChannelReader<TIn, TOut> : ChannelReader<TOut> | |
| { | |
| private readonly ChannelReader<TIn> _reader; | |
| private readonly int _maxBatch; | |
| private readonly Func<TIn,int> _size; | |
| private readonly Func<TIn,TOut> _seed; | |
| private readonly Func<TIn,TOut,TOut> _agg; | |
| public BatchingChannelReader(ChannelReader<TIn> reader, int maxBatch, Func<TIn,int> size, Func<TIn,TOut> seed, Func<TIn,TOut,TOut> agg) | |
| { | |
| _reader = reader; | |
| _maxBatch = maxBatch; | |
| _size = size; | |
| _seed = seed; | |
| _agg = agg; | |
| } | |
| private bool hasBufferedItem = false; | |
| private TIn bufferedItem = default; | |
| private int bufferedSize = 0; | |
| public override bool TryRead([MaybeNullWhen(false)] out TOut item) | |
| { | |
| int currentSize = 0; | |
| TOut returnVal = default; | |
| bool hasOut = false; | |
| if (hasBufferedItem) | |
| { | |
| currentSize = bufferedSize; | |
| returnVal = _seed(bufferedItem); | |
| hasBufferedItem = false; | |
| hasOut = true; | |
| } | |
| while (_reader.TryRead(out var nextItem)) | |
| { | |
| bufferedSize = _size(nextItem); | |
| if (currentSize+bufferedSize>_maxBatch && hasOut) | |
| { | |
| hasBufferedItem = true; | |
| bufferedItem = nextItem; | |
| } | |
| else | |
| { | |
| returnVal = hasOut? _agg(nextItem,returnVal) : _seed(nextItem); | |
| hasOut = true; | |
| } | |
| } | |
| item = returnVal; | |
| return hasOut; | |
| } | |
| public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default) | |
| { | |
| if (hasBufferedItem) | |
| { | |
| return new ValueTask<bool>(true); | |
| } | |
| return _reader.WaitToReadAsync(cancellationToken); | |
| } | |
| } | |
| // You can define other methods, fields, classes and namespaces here |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment