Created
March 12, 2021 06:43
-
-
Save altbodhi/1799af4fd5f767b240b20e5b5192e819 to your computer and use it in GitHub Desktop.
CSharp MailBoxProcessor inspired F#
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using static System.Console; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
using System.Threading; | |
using System.Collections.Generic; | |
namespace CSharpMailBoxProcessor | |
{ | |
public class Reply | |
{ | |
readonly BufferBlock<List<string>> buffer = new BufferBlock<List<string>>(); | |
public async Task<bool> PostAndReply(List<string> message) | |
{ | |
return await buffer.SendAsync(message); | |
} | |
public async Task<List<string>> Answer() | |
{ | |
return await buffer.ReceiveAsync<List<string>>(); | |
} | |
} | |
public class MailBoxProccessor | |
{ | |
readonly BufferBlock<object> buffer = new BufferBlock<object>(); | |
readonly List<string> messages = new List<string>(); | |
public async Task Processing(CancellationToken ct) | |
{ | |
while (true) | |
{ | |
var received = await buffer.ReceiveAsync(); | |
if (received is string message) | |
{ | |
var data = message.Split(':'); | |
if (data[1] == "add") | |
{ | |
messages.Add(data[0]); | |
WriteLine("ADD =>"); | |
} | |
else if (messages.Count > 0) | |
{ | |
messages.RemoveAt(0); | |
WriteLine("DEL =>"); | |
} | |
continue; | |
} | |
if (received is Reply reply) | |
{ | |
WriteLine("REPL =>"); | |
await reply.Post(new List<string>(messages)); | |
} | |
} | |
} | |
public async Task<bool> Post(object message) | |
{ | |
return await buffer.SendAsync(message); | |
} | |
} | |
class Program | |
{ | |
static async Task Main(string[] args) | |
{ | |
var cts = new CancellationTokenSource(); | |
var mb = new MailBoxProccessor(); | |
Task.Run(async () => await mb.Processing(cts.Token)); | |
var start = DateTime.Now; | |
var count = 0; | |
Task.Run(async () => | |
{ | |
var random = new Random(Environment.TickCount); | |
while (true) | |
{ | |
await mb.Post($"Record {++count}:{(random.NextDouble() > 0.5 ? "add" : "del")}"); | |
await Task.Delay(TimeSpan.FromMilliseconds(100)); | |
} | |
}); | |
Task.Run(async () => | |
{ | |
while (true) | |
{ | |
try | |
{ | |
await Task.Delay(TimeSpan.FromMilliseconds(100)); | |
var repl = new Reply(); | |
await mb.Post(repl); | |
var items = await repl.Answer(); | |
if (items.Count == 0) | |
WriteLine("<= EMPTY"); | |
items.ForEach(WriteLine); | |
} | |
catch (Exception exc) | |
{ | |
WriteLine(exc.Message); | |
Environment.Exit(0); | |
} | |
} | |
}); | |
WriteLine("ReadLine..."); | |
ReadLine(); | |
WriteLine("Finish"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment