Skip to content

Instantly share code, notes, and snippets.

@altbodhi
Created March 12, 2021 06:43
Show Gist options
  • Save altbodhi/1799af4fd5f767b240b20e5b5192e819 to your computer and use it in GitHub Desktop.
Save altbodhi/1799af4fd5f767b240b20e5b5192e819 to your computer and use it in GitHub Desktop.
CSharp MailBoxProcessor inspired F#
using System;
using static System.Console;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;
using System.Collections.Generic;
namespace CSharpMailBoxProcessor
{
public class Reply
{
readonly BufferBlock<List<string>> buffer = new BufferBlock<List<string>>();
public async Task<bool> PostAndReply(List<string> message)
{
return await buffer.SendAsync(message);
}
public async Task<List<string>> Answer()
{
return await buffer.ReceiveAsync<List<string>>();
}
}
public class MailBoxProccessor
{
readonly BufferBlock<object> buffer = new BufferBlock<object>();
readonly List<string> messages = new List<string>();
public async Task Processing(CancellationToken ct)
{
while (true)
{
var received = await buffer.ReceiveAsync();
if (received is string message)
{
var data = message.Split(':');
if (data[1] == "add")
{
messages.Add(data[0]);
WriteLine("ADD =>");
}
else if (messages.Count > 0)
{
messages.RemoveAt(0);
WriteLine("DEL =>");
}
continue;
}
if (received is Reply reply)
{
WriteLine("REPL =>");
await reply.Post(new List<string>(messages));
}
}
}
public async Task<bool> Post(object message)
{
return await buffer.SendAsync(message);
}
}
class Program
{
static async Task Main(string[] args)
{
var cts = new CancellationTokenSource();
var mb = new MailBoxProccessor();
Task.Run(async () => await mb.Processing(cts.Token));
var start = DateTime.Now;
var count = 0;
Task.Run(async () =>
{
var random = new Random(Environment.TickCount);
while (true)
{
await mb.Post($"Record {++count}:{(random.NextDouble() > 0.5 ? "add" : "del")}");
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
});
Task.Run(async () =>
{
while (true)
{
try
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
var repl = new Reply();
await mb.Post(repl);
var items = await repl.Answer();
if (items.Count == 0)
WriteLine("<= EMPTY");
items.ForEach(WriteLine);
}
catch (Exception exc)
{
WriteLine(exc.Message);
Environment.Exit(0);
}
}
});
WriteLine("ReadLine...");
ReadLine();
WriteLine("Finish");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment