Created
October 16, 2019 18:22
-
-
Save NickLarsen/d7286c3aa631e5f1eaf459976631e765 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async Task Main() | |
{ | |
//Environment.Version.Dump(); | |
Util.CreateSynchronizationContext(); | |
// 3ms to 35ms | |
var times = Enumerable.Range(0, 1_000) | |
.Select(m => | |
{ | |
var randNorm = RandomNormal(10d, 10d); | |
return Math.Max(randNorm, 3d); | |
}) | |
.ToArray(); | |
times.Sum().Dump(); | |
// the timing function appears to be good to within about 0.25 ms on average | |
// var actuals = times | |
// .AsParallel() | |
// .Select(async expected => new { expected, actual = await JustWait(expected).ConfigureAwait(false) }) | |
// .ToArray(); | |
// await Task.WhenAll(actuals); | |
// | |
// actuals.Select(m => m.Result.actual - m.Result.expected).Average().Dump("Average deviance"); | |
// actuals.Dump(); | |
const int bufferSize = 10; | |
Stopwatch timer; | |
timer = Stopwatch.StartNew(); | |
await PipeContinue(times, bufferSize, JustWait); | |
timer.Stop(); | |
timer.Elapsed.Dump(nameof(PipeContinue)); | |
timer.Elapsed.TotalMilliseconds.Dump(nameof(PipeContinue)); | |
timer = Stopwatch.StartNew(); | |
await PipeContinueBuffer(times, bufferSize, JustWait); | |
timer.Stop(); | |
timer.Elapsed.Dump(nameof(PipeContinueBuffer)); | |
timer.Elapsed.TotalMilliseconds.Dump(nameof(PipeContinueBuffer)); | |
timer = Stopwatch.StartNew(); | |
await Pipe(times, bufferSize, JustWait); | |
timer.Stop(); | |
timer.Elapsed.Dump(nameof(Pipe)); | |
timer.Elapsed.TotalMilliseconds.Dump(nameof(Pipe)); | |
} | |
async Task<double> JustWait(double ms) | |
{ | |
var sw = Stopwatch.StartNew(); | |
await Task.Delay((int)ms); | |
return sw.Elapsed.TotalMilliseconds; | |
// while (true) | |
// { | |
// var elapsed = sw.Elapsed.TotalMilliseconds; | |
// if (elapsed >= ms) return Task.FromResult(elapsed); | |
// } | |
} | |
Random rand = new Random(20191016); //reuse this if you are generating many | |
double RandomNormal(double mean, double stdDev) | |
{ | |
// https://stackoverflow.com/a/218600 | |
double u1 = 1.0 - rand.NextDouble(); //uniform(0,1] random doubles | |
double u2 = 1.0 - rand.NextDouble(); | |
double randStdNormal = Math.Sqrt(-2.0 * Math.Log(u1)) * Math.Sin(2.0 * Math.PI * u2); //random normal(0,1) | |
double randNormal = mean + stdDev * randStdNormal; //random normal(mean,stdDev^2) | |
return randNormal; | |
} | |
// Define other methods and classes here | |
static async Task Pipe<T>(IEnumerable<T> source, int pipeLength, Func<T, Task> action) | |
{ | |
var buffer = new Task[pipeLength]; | |
uint index = 0; | |
foreach (var item in source) | |
{ | |
//Console.WriteLine("task starting"); | |
var newTask = action(item); | |
if (newTask.IsCompleted) | |
{ // completed synchronously; check for error and move on | |
newTask.Wait(); | |
} | |
else | |
{ | |
var thisIndex = index++ % pipeLength; | |
var oldTask = buffer[thisIndex]; | |
if (oldTask != null) await oldTask; | |
buffer[thisIndex] = newTask; | |
} | |
} | |
for (int i = 0; i < pipeLength; i++) | |
{ | |
var oldTask = buffer[(i + index) % pipeLength]; | |
if (oldTask != null) await oldTask; | |
} | |
} | |
static Task PipeContinue<T>(IEnumerable<T> source, int maxUncompletedTasks, Func<T, Task> action) | |
{ | |
//long processed = 0; | |
//var nextItems = new AnyNullable<T>[maxUncompletedTasks] | |
var queue = source.GetEnumerator(); | |
async Task TakeWork() | |
{ | |
while (true) | |
{ | |
T item; | |
lock (queue) | |
{ | |
if (!queue.MoveNext()) break; | |
item = queue.Current; | |
} | |
await action(item).ConfigureAwait(false); | |
} | |
Thread.CurrentThread.ManagedThreadId.Dump(); | |
} | |
var workers = Enumerable.Range(0, maxUncompletedTasks) | |
.Select(_ => TakeWork()) | |
.ToArray(); | |
return Task.WhenAll(workers); | |
} | |
static Task PipeContinueBuffer<T>(IEnumerable<T> source, int maxUncompletedTasks, Func<T, Task> action) | |
{ | |
//long processed = 0; | |
//var nextItems = new AnyNullable<T>[maxUncompletedTasks] | |
var queue = source.GetEnumerator(); | |
async Task TakeWork() | |
{ | |
const int bufferSize = 100; | |
var buffer = new T[bufferSize]; | |
while (true) | |
{ | |
int i; | |
lock (queue) | |
{ | |
for (i = 0; i < bufferSize; i++) | |
{ | |
if (!queue.MoveNext()) break; | |
buffer[i] = queue.Current; | |
} | |
} | |
for (int j = 0; j < i; j++) | |
{ | |
await action(buffer[j]).ConfigureAwait(false); | |
} | |
if (i < bufferSize) break; | |
} | |
Thread.CurrentThread.ManagedThreadId.Dump(); | |
} | |
var workers = Enumerable.Range(0, maxUncompletedTasks) | |
.Select(_ => TakeWork()) | |
.ToArray(); | |
return Task.WhenAll(workers); | |
} | |
struct AnyNullable<T> | |
{ | |
public bool HasValue { get; set; } | |
public T Value { get; set; } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment