Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Created July 18, 2022 16:48
Show Gist options
  • Select an option

  • Save to11mtm/9b4402656af4b60f9cbcc8fac4b9b347 to your computer and use it in GitHub Desktop.

Select an option

Save to11mtm/9b4402656af4b60f9cbcc8fac4b9b347 to your computer and use it in GitHub Desktop.
void Main()
{
}
public static class ChannelOpDsl
{
public static ChannelCommandQueue<TCommand> BeginCommandQueue<TCommand>(int bufferSize, bool multiWrite)
{
return new ChannelCommandQueue<TCommand>(bufferSize,multiWrite);
}
public static ChannelReader<TOut> SelectAsync<TIn,TOut>(this ChannelReader<TIn> reader, Func<TIn, CancellationToken, Task<TOut>> func, int parallelism = 1)
{
return new TransformingChannelReader<TIn,TOut>(reader,func, parallelism);
}
public static ChannelReader<TElement> Where<TElement>(this ChannelReader<TElement> reader, Func<TElement,bool> shouldReturn)
{
return new FilteringCompletingChannelReader<TElement>(reader, el=> (shouldReturn(el),false));
}
public static ChannelReader<TElement> FilterUntil<TElement>(this ChannelReader<TElement> reader, Func<TElement,bool> isDone)
{
return new SyncUberSelector<TElement,TElement>(reader,el=>{
return(isDone(el)? SelectState.IsDone: SelectState.ShouldReturn, el)
});
}
public static ChannelReader<TElement> WhereUber<TElement>(this ChannelReader<TElement> reader, Func<TElement, bool> predicate)
{
return new SyncUberSelector<TElement, TElement>(reader, el =>
{
return (predicate(el)? SelectState.ShouldReturn: SelectState.ShouldSkip,el);
});
}
public static ChannelReader<TAgg> Batched<TIn,TAgg>(
this ChannelReader<TIn> reader,
int maxItems,
Func<TIn,int> cost,
Func<TIn,TAgg> seed,
Func<TIn,TAgg,TAgg> aggregate)
{
return new BatchingChannelReader<TIn,TAgg>(reader,maxItems,cost,seed,aggregate);
}
}
public class WatChannelWriter<TOut> : ChannelWriter<TOut>
{
public override bool TryWrite(TOut item)
{
throw new NotImplementedException();
}
public override ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
}
public class StatefulTransformAsync<TIn,TOut,TState> : ChannelReader<TOut>
{
ChannelReader<TIn> _reader;
Func<Task<TState>> _startState;
Func<(TIn input, TState inState),
Task<(TOut output, TState state, bool complete)>> _transform;
LinkedTaskCompletionSource _completion;
public override Task Completion => _completion.GetCompletion();
public StatefulTransformAsync(ChannelReader<TIn> reader,
Func<Task<TState>> startState,
Func<(TIn input,TState inState),
Task<(TOut output,TState state, bool complete)>> transform)
{
_completion = new LinkedTaskCompletionSource(reader.Completion);
_reader = reader;
_startState = startState;
_transform = transform;
}
private bool isClosed = false;
private Task<(TOut output,TState state, bool complete)> outputItem;
private TState currentState;
public override bool TryRead([MaybeNullWhen(false)] out TOut item)
{
if (outputItem == null)
{
TryStartOutputItemFast();
}
if (isClosed)
{
}
else if (outputItem != null && outputItem.IsCompleted)
{
var newStuff = outputItem.Result;
currentState = newStuff.state;
item = newStuff.output;
if (newStuff.complete)
{
isClosed = true;
}
return true;
}
item = default;
return false;
}
public void TryStartOutputItemFast()
{
if (_reader.TryRead(out var nextItem))
{
outputItem = _transform((nextItem,currentState));
}
}
public override async ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
{
if (isClosed)
{
return false;
}
if (outputItem != null)
{
await outputItem;
return true;
}
if (await _reader.WaitToReadAsync(cancellationToken) == false)
{
return false;
}
TryStartOutputItemFast();
if (outputItem.IsCompleted)
{
return outputItem.Result.complete == false;
}
else
{
return (await outputItem).complete == false;
}
}
}
public class ChannelCommandQueue<TCommand>
{
private readonly Channel<TCommand> _channel;
public ChannelCommandQueue(int maxBuffer, bool multiWrite)
{
_channel = Channel.CreateBounded<TCommand>(new BoundedChannelOptions(maxBuffer){ FullMode = BoundedChannelFullMode.Wait, SingleWriter = !multiWrite, SingleReader = true}
}
public ChannelReader<TCommand> Reader => _channel.Reader;
public bool TryWrite(TCommand command)
{
if (_channel.Writer.TryWrite(command))
{
return true;
}
return false;
}
public bool TryComplete()
{
return _channel.Writer.TryComplete();
}
public bool TryFail(Exception ex)
{
return _channel.Writer.TryComplete(ex);
}
public async ValueTask WriteAsync(TCommand command, CancellationToken token= default)
{
await _channel.Writer.WriteAsync(command, token);
}
public async ValueTask<bool> WaitForWriteOpenAsync(CancellationToken token = default)
{
return await _channel.Writer.WaitToWriteAsync(token);
}
}
public class ChannelReaderAction<TIn>
{
private readonly ChannelReader<TIn> _reader
private readonly Func<TIn,Task> _foreachAction;
public ChannelReaderAction(ChannelReader<TIn> reader, Func<TIn,Task> foreachAction)
{
_reader = reader;
_foreachAction = foreachAction;
}
public async Task ExecuteAsync(CancellationToken token)
{
while (await _reader.WaitToReadAsync(token))
{
_reader.TryRead(out var toDo);
await _foreachAction(toDo);
}
}
}
public class LinkedTaskCompletionSource
{
private TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
private Task _parent;
public LinkedTaskCompletionSource(Task parent)
{
_parent = parent;
}
public async Task GetCompletion()
{
await _parent;
await tcs.Task;
}
public void TrySetComplete()
{
tcs.TrySetResult();
}
public void TrySetFailed(Exception ex)
{
tcs.TrySetException(ex);
}
}
public class FilteringCompletingChannelReader<TElement> : ChannelReader<TElement>
{
public override Task Completion => base.Completion;
private readonly ChannelReader<TElement> _reader;
private readonly Func<TElement,(bool shouldReturn, bool isDone)> _filterFunc;
private bool hasBuffered;
private TElement element;
private bool isClosed = false;
public FilteringCompletingChannelReader(ChannelReader<TElement> reader, Func<TElement,(bool shouldReturn,bool isDone)> filterFunc)
{
_reader = reader;
_filterFunc = filterFunc;
}
public override bool TryRead([MaybeNullWhen(false)] out TElement item)
{
if (isClosed)
{
}
else if (hasBuffered)
{
hasBuffered = false;
item = element;
return true;
}
else
{
bool foundVal = false;
while (foundVal == false && _reader.Completion.IsCompletedSuccessfully==false)
{
if (_reader.TryRead(out var next) == false)
{
item = default;
return false;
}
var (sr, id) = _filterFunc(next);
if (sr)
{
item = next;
return true;
}
if (id)
{
isClosed = true;
}
}
}
item = default;
return false;
}
public override async ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
{
bool foundVal = false;
while (foundVal == false)
{
if (_reader.TryRead(out var nextItem) == false)
{
//weird, race?
//We fall through to wait again
}
else
{
var (sr, id) = _filterFunc(nextItem);
if (sr)
{
element = nextItem;
hasBuffered = true;
return true;
}
else if (id)
{
return false;
}
}
if (await _reader.WaitToReadAsync(cancellationToken) == false || _reader.Completion.IsCompletedSuccessfully)
{
return false;
}
}
return true;
}
}
public class ChannelSpreader<TElement>
{
ChannelReader<TElement> _reader;
Channel<TElement>[] _fanOutChannels;
public ChannelSpreader(ChannelReader<TElement> reader, int maxOut)
{
_reader = reader;
_fanOutChannels = Enumerable.Repeat(Channel.CreateBounded<TElement>(new BoundedChannelOptions(1) { SingleWriter = false, SingleReader = false }), maxOut).ToArray();
Task.Run(async () => {
var reader = _reader.ReadAllAsync();
try
{
await foreach (var element in reader)
{
bool tryWrite()
{
bool wroteOne = false;
foreach (var output in _fanOutChannels)
{
if (output.Writer.TryWrite(element))
{
wroteOne = true;
break;
}
}
return wroteOne;
};
while (tryWrite() == false)
{
await Task.WhenAny(_fanOutChannels.Select(r => r.Writer.WaitToWriteAsync().AsTask()));
}
}
}
catch (Exception ex)
{
foreach (var ch in _fanOutChannels)
{
ch.Writer.TryComplete(ex);
}
}
foreach (var ch in _fanOutChannels)
{
ch.Writer.TryComplete();
}
});
_reader.read
}
}
public class TransformingChannelReader<TIn, TOut> : ChannelReader<TOut>
{
private readonly ChannelReader<TIn> _reader;
private readonly Func<TIn, CancellationToken, Task<TOut>> _transform;
private readonly int _maxParallelism;
private ConcurrentQueue<Task<TOut>> waitingOuts;
private int currentOuts;
private int isPullingFirst = 0;
private readonly object _lockObj = new Object();
private int pullingCheck = 0;
public TransformingChannelReader(ChannelReader<TIn> reader,Func<TIn,CancellationToken,Task<TOut>> transform, int maxParallelism)
{
_reader = reader;
_transform = transform;
_maxParallelism = maxParallelism;
waitingOuts = new ConcurrentQueue<System.Threading.Tasks.Task<TOut>>();
}
public override bool TryRead([MaybeNullWhen(false)] out TOut item)
{
EnsureQueueState();
doPeek:
if (waitingOuts.TryPeek(out var hasVal))
{
if (hasVal.IsCompleted)
{
if (waitingOuts.TryDequeue(out hasVal) && hasVal.IsCompleted)
{
Interlocked.Decrement(ref currentOuts);
item = hasVal.Result;
return true;
}
else
{
goto doPeek;
}
}
}
item = default;
return false;
}
private void EnsureQueueState()
{
//OK so this is kinda sloppy,
//And may cause some 'drift' under multiple reader scenarios.
if (Interlocked.CompareExchange(ref pullingCheck, 1, 0) == 0)
{
while (Volatile.Read(ref currentOuts) <= _maxParallelism && _reader.TryRead(out var next))
{
var ourNum = Interlocked.Increment(ref currentOuts);
if (ourNum <= _maxParallelism)
{
waitingOuts.Enqueue(_transform(next, default));
}
}
Volatile.Write(ref pullingCheck, 0);
}
}
public override async ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
{
EnsureQueueState();
Task<TOut> t = null;
while (currentOuts == 0 && (waitingOuts.TryPeek(out t) == false))
{
var readWait = await _reader.WaitToReadAsync(cancellationToken);
if (readWait == false)
{
return false;
}
EnsureQueueState();
}
await t;
return true;
}
}
public class FlattenChannelReader<TElement> : ChannelReader<TElement>
{
public readonly ChannelReader<IEnumerable<TElement>> _reader;
private IEnumerator<TElement> current;
private bool hasBuffered;
public FlattenChannelReader(ChannelReader<IEnumerable<TElement>> reader)
{
_reader = reader;
}
public override bool TryRead([MaybeNullWhen(false)] out TElement item)
{
if (hasBuffered)
{
hasBuffered = false;
item = current.Current;
return true;
}
else if (current == null || current.MoveNext() == false)
{
current?.Dispose();
while (_reader.TryRead(out var eInst))
{
var enumerator = eInst.GetEnumerator();
if (enumerator.MoveNext())
{
current = enumerator;
item = enumerator.Current;
return true;
}
else
{
enumerator.Dispose();
}
}
item = default;
return false;
}
item = current.Current;
return true;
}
public override async ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
{
if (current == null || current.MoveNext() == false)
{
bool foundNonEmpty = false;
while (foundNonEmpty == false)
{
var hasNext = await _reader.WaitToReadAsync(cancellationToken);
if (hasNext == false)
{
return false;
}
while (_reader.TryRead(out var eInst))
{
var enumerator = eInst.GetEnumerator();
if (enumerator.MoveNext())
{
current = enumerator;
hasBuffered = true;
foundNonEmpty = true;
}
else
{
enumerator.Dispose();
}
}
}
return true;
}
else
{
hasBuffered = true;
return true;
}
}
}
public enum SelectState
{
ShouldReturn,
IsDone,
ShouldSkip,
}
public class SyncUberSelector<TIn, TOut> : ChannelReader<TOut>
{
private readonly ChannelReader<TIn> _reader;
Func<TIn, (SelectState state, TOut next)> _selector;
public SyncUberSelector(ChannelReader<TIn> reader, Func<TIn, (SelectState state, TOut next)> selector)
{
_reader = reader;
_selector = selector;
}
private bool hasBufferedItem = false;
private TOut bufferedItem;
public override bool TryRead([MaybeNullWhen(false)] out TOut item)
{
while (_reader.TryRead(out var nextItem))
{
var res = _selector(nextItem);
if (res.state == SelectState.ShouldReturn)
{
item = res.next;
return true;
}
else if (res.state == SelectState.IsDone)
{
break;
}
}
item = default;
return false;
}
public override async ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
{
if (hasBufferedItem)
{
return true;
}
else
{
bool isDone = false;
while (await _reader.WaitToReadAsync(cancellationToken))
{
if (_reader.TryRead(out var nextItem))
{
var res = _selector(nextItem);
if (res.state == SelectState.ShouldReturn)
{
hasBufferedItem = true;
bufferedItem = res.next;
return true;
}
else if (res.state == SelectState.IsDone)
{
return false;
}
}
}
return false;
}
}
}
public class BatchingChannelReader<TIn, TOut> : ChannelReader<TOut>
{
private readonly ChannelReader<TIn> _reader;
private readonly int _maxBatch;
private readonly Func<TIn,int> _size;
private readonly Func<TIn,TOut> _seed;
private readonly Func<TIn,TOut,TOut> _agg;
public BatchingChannelReader(ChannelReader<TIn> reader, int maxBatch, Func<TIn,int> size, Func<TIn,TOut> seed, Func<TIn,TOut,TOut> agg)
{
_reader = reader;
_maxBatch = maxBatch;
_size = size;
_seed = seed;
_agg = agg;
}
private bool hasBufferedItem = false;
private TIn bufferedItem = default;
private int bufferedSize = 0;
public override bool TryRead([MaybeNullWhen(false)] out TOut item)
{
int currentSize = 0;
TOut returnVal = default;
bool hasOut = false;
if (hasBufferedItem)
{
currentSize = bufferedSize;
returnVal = _seed(bufferedItem);
hasBufferedItem = false;
hasOut = true;
}
while (_reader.TryRead(out var nextItem))
{
bufferedSize = _size(nextItem);
if (currentSize+bufferedSize>_maxBatch && hasOut)
{
hasBufferedItem = true;
bufferedItem = nextItem;
}
else
{
returnVal = hasOut? _agg(nextItem,returnVal) : _seed(nextItem);
hasOut = true;
}
}
item = returnVal;
return hasOut;
}
public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
{
if (hasBufferedItem)
{
return new ValueTask<bool>(true);
}
return _reader.WaitToReadAsync(cancellationToken);
}
}
// You can define other methods, fields, classes and namespaces here
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment