Created
September 5, 2020 09:02
-
-
Save rogeralsing/3f6001462824ae3d74081e783f591466 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading.Channels; | |
using System.Threading.Tasks; | |
namespace ConsoleApp3 | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var channelA = Channel.CreateBounded<string>(1000); | |
var channelB = Channel.CreateBounded<Task>(1000); | |
_ = ProduceA(channelA.Writer); | |
_ = ReadA(channelA.Reader,channelB.Writer); | |
_ = BatchCompleted(channelB.Reader); | |
Console.ReadLine(); | |
} | |
private static async Task BatchCompleted(ChannelReader<Task> channelBReader) | |
{ | |
await Task.Yield(); | |
var buffer = new List<Task>(); | |
while (true) | |
{ | |
var read = channelBReader.TryRead(out var task); | |
if (!read) | |
{ | |
if (buffer.Count > 0) | |
{ | |
await WaitForBatchBuffer(buffer); | |
} | |
} | |
else | |
{ | |
buffer.Add(task); | |
//MAX BATCH COMMIT SIZE | |
if (buffer.Count == 100) | |
{ | |
await WaitForBatchBuffer(buffer); | |
} | |
} | |
} | |
} | |
private static int messageCount = 0; | |
private static async Task WaitForBatchBuffer(List<Task> buffer) | |
{ | |
messageCount += buffer.Count; | |
Console.WriteLine("Waiting for batch buffer"); | |
Console.WriteLine("Total messages handled " + messageCount); | |
//We could commit the read offset here. | |
//(We would ofc need more than just the tasks, some info on the message batch itself) | |
await Task.WhenAll(buffer); | |
buffer.Clear(); | |
} | |
//CONSUMER: | |
//this is basically what we have right now in Gjöll | |
private static async Task ReadA(ChannelReader<string> channelAReader, ChannelWriter<Task> channelBWriter) | |
{ | |
await Task.Yield(); | |
while (true) | |
{ | |
var message = await channelAReader.ReadAsync(); | |
var task = HandleMessage(message); | |
await channelBWriter.WriteAsync(task); | |
} | |
} | |
//PROCESSOR: | |
//this is basically what we have right now in Gjöll | |
private static async Task HandleMessage(string message) | |
{ | |
//This would be the Processor message handler | |
//simulate slow handler | |
await Task.Delay(1000); | |
} | |
//PRODUCER: | |
//this is basically what we have right now in Gjöll | |
private static async Task ProduceA(ChannelWriter<string> channelWriter) | |
{ | |
await Task.Yield(); | |
while (true) | |
{ | |
//this would be the Kafka reader | |
await channelWriter.WriteAsync(DateTime.Now.ToString()); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://gist.github.com/rogeralsing/3f6001462824ae3d74081e783f591466#file-foo-cs-L60
Once this task channel reaches a certain point, we wait for those tasks to complete.
freeing up space in the batch channel, allowing the entire pipeline to push more tasks