Last active
March 7, 2022 09:42
-
-
Save MoaidHathot/45900e9265018ac5a99daec32aa57c09 to your computer and use it in GitHub Desktop.
David Fowler's challange to implement 'UberQueue<T>' //https://twitter.com/davidfowl/status/1426453915357175819
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async Task Main() | |
{ | |
var queues = new AsyncQueue<int>[] | |
{ | |
new (100, TimeSpan.FromSeconds(1), Increment), | |
new (200, TimeSpan.FromSeconds(2), Increment), | |
new (300, TimeSpan.FromSeconds(3), Increment), | |
new (400, TimeSpan.FromSeconds(4), Increment), | |
}; | |
var uber = new UberQueue<int>(queues); | |
int index = 0; | |
await Dequeu(index++, uber); | |
await Wait(2); | |
await Dequeu(index++, uber); | |
await Wait(2); | |
await Dequeu(index++, uber); | |
await Wait(2); | |
await Dequeu(index++, uber); | |
await Wait(5); | |
await Dequeu(index++, uber); | |
await Wait(1); | |
await Dequeu(index++, uber); | |
await Dequeu(index++, uber); | |
await Dequeu(index++, uber); | |
await Dequeu(index++, uber); | |
} | |
private async Task Wait(int secodns) | |
{ | |
$" Started delay '{secodns}'".Dump(); | |
await Task.Delay(TimeSpan.FromSeconds(secodns)); | |
$" Finishe delay '{secodns}'".Dump(); | |
} | |
private async Task Dequeu<T>(int index, UberQueue<T> uber) | |
{ | |
$"[{index}] getting...".Dump(); | |
var item = await uber.DequeueAsync(); | |
$"[{index++}] Got '{item}'".Dump(); | |
} | |
class UberQueue<T> : IAsyncQueue<T> | |
{ | |
private IAsyncQueue<T>[] _queues; | |
private IAsyncEnumerator<T> _enumerator; | |
public UberQueue(IAsyncQueue<T>[] queues) | |
{ | |
_queues = queues; | |
_enumerator = CreateEnumerator().GetAsyncEnumerator(); | |
} | |
private async IAsyncEnumerable<T> CreateEnumerator() | |
{ | |
while(_queues.Any()) | |
{ | |
var tasks = _queues.Select(q => (q, task: q.DequeueAsync())).ToList(); | |
while(tasks.Any()) | |
{ | |
var done = await Task.WhenAny(tasks.Select(tuple => tuple.task)); | |
$" Yielding '{done.Result}'".Dump(); | |
yield return done.Result; | |
$" Fixing after yielding '{done.Result}'".Dump(); | |
var pair = tasks.First(pair => pair.task == done); | |
tasks.Remove(pair); | |
tasks.Add((pair.q, pair.q.DequeueAsync())); | |
} | |
} | |
} | |
public async Task<T> DequeueAsync() | |
{ | |
await _enumerator.MoveNextAsync(); | |
return _enumerator.Current; | |
} | |
} | |
interface IAsyncQueue<T> | |
{ | |
Task<T> DequeueAsync(); | |
} | |
public class AsyncQueue<T> : IAsyncQueue<T> | |
{ | |
public TimeSpan Delay { get; } | |
public T _item; | |
private Func<T, T> _updater; | |
public AsyncQueue(T item, TimeSpan delay, Func<T, T> updater) | |
{ | |
_item = item; | |
Delay = delay; | |
_updater = updater; | |
} | |
public async Task<T> DequeueAsync() | |
{ | |
_item = _updater(_item); | |
$" Started '{_item}'".Dump(); | |
await Task.Delay(Delay).ContinueWith(_ => $" Finished '{_item}'".Dump()); | |
return _item; | |
} | |
} | |
private int Increment(int item) => item + 1; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment