Skip to content

Instantly share code, notes, and snippets.

@AlgorithmsAreCool
Created March 28, 2025 02:00
Show Gist options
  • Save AlgorithmsAreCool/21eb859ef4a5c24f2ab3044d96a95010 to your computer and use it in GitHub Desktop.
Save AlgorithmsAreCool/21eb859ef4a5c24f2ab3044d96a95010 to your computer and use it in GitHub Desktop.
This is a sketch of a pubsub Message Broker that coordinates communication between a producer/publisher and zero or more consumers/subscribers
using System.Threading.Channels;
using CommunityToolkit.Diagnostics;
using Orleans.Concurrency;
namespace Orleans.Grains.Utility;
public sealed class MessageChannelGrain<T> : Grain, IMessageChannelGrain<T>
{
public MessageChannelGrain(IPersistentState<SerializableState> persistentState)
{
_persistentState = persistentState;
SubscriptionQueues = [];
IsCompleted = false;
}
private readonly IPersistentState<SerializableState> _persistentState;
private Dictionary<SubscriptionHandle, SubscriptionQueue> SubscriptionQueues { get; } = [];
private bool IsCompleted { get; set; }
public ValueTask<SubscriptionHandle> SubscribeAsync(SubscriptionOptions options)
{
if (IsCompleted)
throw new InvalidOperationException("Grain is completed");
var channelOptions = new BoundedChannelOptions(options.MaxChannelSize) {
SingleReader = false,
SingleWriter = true,
AllowSynchronousContinuations = false,
Capacity = options.MaxChannelSize,
FullMode = options.ChannelFullMode
};
var newChannel = Channel.CreateBounded<T>(channelOptions);
var handle = new SubscriptionHandle(this.GetGrainId(), Guid.NewGuid());
var newSubscription = new SubscriptionQueue {
Handle = handle,
Channel = newChannel,
Options = channelOptions
};
SubscriptionQueues.Add(handle, newSubscription);
return ValueTask.FromResult(handle);
}
public override Task OnActivateAsync(CancellationToken cancellationToken)
{
var state = _persistentState.State;
if (state is not null)
{
foreach (var serializedChannel in state.Channels)
{
var options = new BoundedChannelOptions(serializedChannel.MaxChannelSize) {
SingleReader = false,
SingleWriter = true,
AllowSynchronousContinuations = false,
FullMode = serializedChannel.ChannelFullMode
};
var newChannel = Channel.CreateBounded<T>(options);
SubscriptionQueues.Add(serializedChannel.Handle, new SubscriptionQueue {
Channel = newChannel,
Handle = serializedChannel.Handle,
Options = options
});
foreach (var message in serializedChannel.Values)
_ = newChannel.Writer.TryWrite(message);
}
IsCompleted = state.IsCompleted;
}
return Task.CompletedTask;
}
public override Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
var serializedChannels = new List<SerializedChannel>();
foreach (var (handle, subscription) in SubscriptionQueues)
{
var serializedChannel = new SerializedChannel {
MaxChannelSize = subscription.Options.Capacity,
ChannelFullMode = subscription.Options.FullMode,
Handle = handle,
Values = new Queue<T>()
};
var messages = new List<T>();
while (subscription.Channel.Reader.TryRead(out var message))
messages.Add(message);
serializedChannels.Add(serializedChannel);
}
var serializableState = new SerializableState {
IsCompleted = IsCompleted,
Channels = serializedChannels
};
_persistentState.State = serializableState;
return _persistentState.WriteStateAsync();
}
public ValueTask<int> SubscriberCount() => ValueTask.FromResult(SubscriptionQueues.Count);
public ValueTask UnsubscribeAsync(SubscriptionHandle subscriptionHandle)
{
Guard.IsTrue(SubscriptionQueues.Remove(subscriptionHandle));
return ValueTask.CompletedTask;
}
public async ValueTask<T> ReadNextEventAsync(SubscriptionHandle handle, GrainCancellationToken cancellationToken)
{
if (!SubscriptionQueues.TryGetValue(handle, out var subscription))
throw new InvalidOperationException("Subscription not found");
Guard.IsFalse(subscription.HasActiveListener);
try
{
subscription.HasActiveListener = true;
var result = await subscription.Channel.Reader.ReadAsync(cancellationToken.CancellationToken);
return result;
}
finally
{
subscription.HasActiveListener = false;
}
}
public async IAsyncEnumerable<T> ReadManyEventsAsync(SubscriptionHandle handle, GrainCancellationToken cancellationToken)
{
if (!SubscriptionQueues.TryGetValue(handle, out var subscription))
throw new InvalidOperationException("Subscription not found");
Guard.IsFalse(subscription.HasActiveListener);
try
{
subscription.HasActiveListener = true;
await foreach (var message in subscription.Channel.Reader.ReadAllAsync(cancellationToken.CancellationToken))
yield return message;
}
finally
{
subscription.HasActiveListener = false;
}
}
public async ValueTask BroadcastToSubscribers(T message)
{
foreach (var subscriber in SubscriptionQueues.Values)
await subscriber.Channel.Writer.WriteAsync(message);
}
public ValueTask CompleteAsync()
{
IsCompleted = true;
foreach (var subscriber in SubscriptionQueues.Values)
subscriber.Channel.Writer.Complete();
return ValueTask.CompletedTask;
}
[GenerateSerializer]
public sealed class SerializableState
{
[Id(0)]
public required List<SerializedChannel> Channels { get; init; }
[Id(1)]
public required bool IsCompleted { get; set; }
}
[GenerateSerializer]
public sealed class SerializedChannel
{
[Id(0)]
public required int MaxChannelSize { get; init; }
[Id(1)]
public required BoundedChannelFullMode ChannelFullMode { get; init; }
[Id(2)]
public required SubscriptionHandle Handle { get; init; }
[Id(3)]
public required Queue<T> Values { get; init; }
}
private sealed class SubscriptionQueue
{
public required SubscriptionHandle Handle { get; init; }
public required Channel<T> Channel { get; init; }
public required BoundedChannelOptions Options { get; init; }
public bool HasActiveListener { get; set; }
}
}
[GenerateSerializer]
public readonly record struct SubscriptionHandle(GrainId Grain, Guid Handle);
[GenerateSerializer]
public sealed record SubscriptionOptions
{
public required int MaxChannelSize { get; init; }
public required BoundedChannelFullMode ChannelFullMode { get; init; }
}
public interface IMessageChannelGrain<T> : IGrainWithStringKey
{
[AlwaysInterleave]
public ValueTask<SubscriptionHandle> SubscribeAsync(SubscriptionOptions subscriptionOptions);
[AlwaysInterleave]
public ValueTask UnsubscribeAsync(SubscriptionHandle subscriptionHandle);
[AlwaysInterleave]
public ValueTask BroadcastToSubscribers(T message);
[AlwaysInterleave]
public ValueTask CompleteAsync();
[AlwaysInterleave]
public ValueTask<int> SubscriberCount();
[AlwaysInterleave]
[ResponseTimeout("01:00:00")]
public IAsyncEnumerable<T> ReadManyEventsAsync(SubscriptionHandle handle, GrainCancellationToken cancellationToken);
[AlwaysInterleave]
[ResponseTimeout("01:00:00")]
public ValueTask<T> ReadNextEventAsync(SubscriptionHandle handle, GrainCancellationToken cancellationToken);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment