Last active
August 8, 2021 02:20
-
-
Save celsojr/fd5b020ef26e183aa05c0d0ba99e63f1 to your computer and use it in GitHub Desktop.
Small experiment with the parallel programming model and the thread-safe collection BlockingCollection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Linq; | |
using System.Net.Http; | |
using System.Net.Http.Json; | |
using System.Text.Json; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using static System.Console; | |
const int offset = 5; | |
using var client = new HttpClient(); | |
Profile(() => LoadPokemonsInParallel(client, offset), "In parallel loop time elapsed:"); | |
Profile(() => LoadPokemonsInner(client, offset), "Inner/linear loop time elapsed:"); | |
static void LoadPokemonsInParallel(HttpClient client, int offset) | |
{ | |
using var collection = new BlockingCollection<Pokemon>(offset); | |
Parallel.For(0, offset, async i => | |
{ | |
WriteLine($"On thread {Thread.CurrentThread.ManagedThreadId}"); | |
var pokemon = await LoadPokemonAsync(client, i); | |
collection.Add(pokemon); | |
}); | |
// Wait for all items to be added | |
while (collection.Count < collection.BoundedCapacity) { } | |
// No longer accepting additions and avoiding | |
// blocking when empty (not a concern in this example) | |
collection.CompleteAdding(); | |
// This will throw an unhandled System.InvalidOperationException exception at this point | |
// collection.Add(new Pokemon("", "")); | |
// This GetConsumingEnumerable() enumerator modifies the source collection by removing items | |
foreach (var pokemon in collection.GetConsumingEnumerable()) | |
{ | |
WriteLine($"{pokemon.Name} \t (remaining: {collection.Count:D2})"); | |
} | |
WriteLine($"Look, the collection is now empty: {collection.Count}"); | |
} | |
static void LoadPokemonsInner(HttpClient client, int offset) | |
{ | |
var collection = Enumerable.Empty<Pokemon>().ToList(); | |
for (int i = 0; i < offset; i++) | |
{ | |
WriteLine($"On thread {Thread.CurrentThread.ManagedThreadId}"); | |
var pokemon = LoadPokemon(client, i); | |
collection.Add(pokemon); | |
} | |
WriteLine(string.Join(", ", collection.Select(p => p.Name))); | |
} | |
static Pokemon LoadPokemon(HttpClient client, int offset) | |
{ | |
var url = $"https://pokeapi.co/api/v2/pokemon?limit=1&offset={offset}"; | |
using var request = new HttpRequestMessage(HttpMethod.Get, url); | |
using var response = client.Send(request); | |
using var reader = new StreamReader(response.Content.ReadAsStream()); | |
var opts = new JsonSerializerOptions() { PropertyNameCaseInsensitive = true }; | |
var items = JsonSerializer.Deserialize<Items>(reader.ReadToEnd(), opts); | |
var pokemon = items.Results.FirstOrDefault(); | |
return pokemon; | |
} | |
static async Task<Pokemon> LoadPokemonAsync(HttpClient client, int offset) | |
{ | |
var url = $"https://pokeapi.co/api/v2/pokemon?limit=1&offset={offset}"; | |
var res = await client.GetFromJsonAsync<Items>(url); | |
return res.Results.FirstOrDefault(); | |
} | |
/// <summary> | |
/// Simple profile helper method | |
/// Pro tip: In general, BenchmarkDotNet will provide more realistic results. | |
/// </summary> | |
/// <param name="action">The current action to be performed</param> | |
/// <param name="message">Usually the operation identifier</param> | |
static void Profile(Action action, string message = default) | |
{ | |
// Storing the current standard output for later use | |
var standardOutput = Out; | |
// We don't want output anything during the profiling. | |
// Note: When debugging this will also omit debug messages. | |
SetOut(TextWriter.Null); | |
SetError(TextWriter.Null); | |
// Warm up by letting csharp compiler and the JIT compiler do their | |
// work at least once, so we don't measure this along with the actual operation. | |
action.Invoke(); | |
// We don't want anything else on the machine to | |
// affect this, because it's gonna affect the results. | |
Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.High; | |
Thread.CurrentThread.Priority = ThreadPriority.Highest; | |
// We're using stopwatch here just because it's a good practice. | |
// Stopwatch value changes over time, whereas DateTime doesn't. | |
// Therefore, DateTime might, for some accidental reason, | |
// add some nanoseconds gaps into our time measurement. | |
// The Stopwatch isn't really a timer, it just pulls a high performance counter. | |
Stopwatch sw = new(); | |
// Our payload is the only thing that matters. | |
// Garbage collections takes time, so we don't want the GC | |
// collecting something else before we go to our actual operation. | |
GC.Collect(); | |
GC.WaitForPendingFinalizers(); | |
GC.Collect(); | |
// It's probably better to have a loop here | |
// and get the average execution time of the operation. | |
// The more interactions we have, the more accurate the results. | |
const int interations = 100; | |
sw.Start(); | |
for (int i = 0; i < interations; i++) | |
{ | |
action.Invoke(); | |
} | |
sw.Stop(); | |
// Redirect output to standard output again. | |
SetOut(standardOutput); | |
SetError(standardOutput); | |
// Now it's time to make an actual method call. | |
action.Invoke(); | |
WriteLine($"{message} {sw.Elapsed.TotalSeconds / interations} s\n"); | |
} | |
internal record Items(IList<Pokemon> Results); | |
internal record Pokemon(string Name, string Url); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment