Last active
June 19, 2021 18:32
-
-
Save benfoster/4416655 to your computer and use it in GitHub Desktop.
A lightweight message bus using TPL DataFlow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
namespace TDFDemo | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
new Program().Run(); | |
} | |
private async void Run() | |
{ | |
var bus = new Bus(); | |
// Inline handler | |
var s1 = bus.Subscribe<Message>(message => Console.WriteLine("Inline Handler 1: {0}", message.Content)); | |
// Inline handler factory | |
var s2 = bus.Subscribe<Message>(() => new MessageHandler().Handle); | |
// Automatic handler subscription | |
var s3 = bus.Subscribe<Message, MessageHandler>(); | |
for (int i = 0; i < 10; i++) | |
{ | |
await bus.SendAsync(new Message("Message " + i)); | |
} | |
// Unsubscribe the second handler | |
bus.Unsubscribe(s2); | |
Thread.Sleep(1000); | |
// Cancellation support | |
Console.WriteLine("\nSecond Burst:"); | |
var tokenSource = new CancellationTokenSource(); | |
var token = tokenSource.Token; | |
for (int i = 0; i < 10; i++) | |
{ | |
await bus.SendAsync(new Message("Message " + i), token); | |
if (i == 5) | |
{ | |
tokenSource.Cancel(); | |
break; | |
} | |
} | |
Console.ReadLine(); | |
} | |
} | |
public class Message | |
{ | |
public DateTime TimeStamp { get; private set; } | |
public string Content { get; private set; } | |
public Message(string content) | |
{ | |
Content = content; | |
TimeStamp = DateTime.UtcNow; | |
} | |
} | |
public interface IHandle<TMessage> | |
{ | |
void Handle(TMessage message); | |
} | |
public class MessageHandler : IHandle<Message> | |
{ | |
public void Handle(Message message) | |
{ | |
Console.WriteLine("Message Handler Received: {0}", message.Content); | |
} | |
} | |
public class Bus | |
{ | |
private readonly BroadcastBlock<object> broadcast = | |
new BroadcastBlock<object>(message => message); | |
private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions | |
= new ConcurrentDictionary<Guid, IDisposable>(); | |
public Task<bool> SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken) | |
{ | |
return broadcast.SendAsync(message, cancellationToken); | |
} | |
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction) | |
{ | |
var handler = new ActionBlock<object>( | |
message => handlerAction((TMessage)message), | |
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 } | |
); | |
var subscription = broadcast.LinkTo( | |
handler, | |
new DataflowLinkOptions { PropagateCompletion = true }, | |
message => message is TMessage | |
); | |
return AddSubscription(subscription); | |
} | |
public void Unsubscribe(Guid subscriptionId) | |
{ | |
IDisposable subscription; | |
if (subscriptions.TryRemove(subscriptionId, out subscription)) | |
{ | |
subscription.Dispose(); | |
} | |
} | |
private Guid AddSubscription(IDisposable subscription) | |
{ | |
var subscriptionId = Guid.NewGuid(); | |
subscriptions.TryAdd(subscriptionId, subscription); | |
return subscriptionId; | |
} | |
} | |
public static class BusExtensions | |
{ | |
public static Task<bool> SendAsync<TMessage>(this Bus bus, TMessage message) | |
{ | |
return bus.SendAsync<TMessage>(message, CancellationToken.None); | |
} | |
public static Guid Subscribe<TMessage>(this Bus bus, Func<Action<TMessage>> handlerActionFactory) | |
{ | |
return bus.Subscribe<TMessage>(message => handlerActionFactory().Invoke(message)); | |
} | |
public static Guid Subscribe<TMessage, THandler>(this Bus bus) where THandler : IHandle<TMessage>, new() | |
{ | |
return bus.Subscribe<TMessage>(message => new THandler().Handle(message)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment