Created
March 28, 2025 02:00
-
-
Save AlgorithmsAreCool/21eb859ef4a5c24f2ab3044d96a95010 to your computer and use it in GitHub Desktop.
This is a sketch of a pubsub Message Broker that coordinates communication between a producer/publisher and zero or more consumers/subscribers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Channels; | |
using CommunityToolkit.Diagnostics; | |
using Orleans.Concurrency; | |
namespace Orleans.Grains.Utility; | |
public sealed class MessageChannelGrain<T> : Grain, IMessageChannelGrain<T> | |
{ | |
public MessageChannelGrain(IPersistentState<SerializableState> persistentState) | |
{ | |
_persistentState = persistentState; | |
SubscriptionQueues = []; | |
IsCompleted = false; | |
} | |
private readonly IPersistentState<SerializableState> _persistentState; | |
private Dictionary<SubscriptionHandle, SubscriptionQueue> SubscriptionQueues { get; } = []; | |
private bool IsCompleted { get; set; } | |
public ValueTask<SubscriptionHandle> SubscribeAsync(SubscriptionOptions options) | |
{ | |
if (IsCompleted) | |
throw new InvalidOperationException("Grain is completed"); | |
var channelOptions = new BoundedChannelOptions(options.MaxChannelSize) { | |
SingleReader = false, | |
SingleWriter = true, | |
AllowSynchronousContinuations = false, | |
Capacity = options.MaxChannelSize, | |
FullMode = options.ChannelFullMode | |
}; | |
var newChannel = Channel.CreateBounded<T>(channelOptions); | |
var handle = new SubscriptionHandle(this.GetGrainId(), Guid.NewGuid()); | |
var newSubscription = new SubscriptionQueue { | |
Handle = handle, | |
Channel = newChannel, | |
Options = channelOptions | |
}; | |
SubscriptionQueues.Add(handle, newSubscription); | |
return ValueTask.FromResult(handle); | |
} | |
public override Task OnActivateAsync(CancellationToken cancellationToken) | |
{ | |
var state = _persistentState.State; | |
if (state is not null) | |
{ | |
foreach (var serializedChannel in state.Channels) | |
{ | |
var options = new BoundedChannelOptions(serializedChannel.MaxChannelSize) { | |
SingleReader = false, | |
SingleWriter = true, | |
AllowSynchronousContinuations = false, | |
FullMode = serializedChannel.ChannelFullMode | |
}; | |
var newChannel = Channel.CreateBounded<T>(options); | |
SubscriptionQueues.Add(serializedChannel.Handle, new SubscriptionQueue { | |
Channel = newChannel, | |
Handle = serializedChannel.Handle, | |
Options = options | |
}); | |
foreach (var message in serializedChannel.Values) | |
_ = newChannel.Writer.TryWrite(message); | |
} | |
IsCompleted = state.IsCompleted; | |
} | |
return Task.CompletedTask; | |
} | |
public override Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken) | |
{ | |
var serializedChannels = new List<SerializedChannel>(); | |
foreach (var (handle, subscription) in SubscriptionQueues) | |
{ | |
var serializedChannel = new SerializedChannel { | |
MaxChannelSize = subscription.Options.Capacity, | |
ChannelFullMode = subscription.Options.FullMode, | |
Handle = handle, | |
Values = new Queue<T>() | |
}; | |
var messages = new List<T>(); | |
while (subscription.Channel.Reader.TryRead(out var message)) | |
messages.Add(message); | |
serializedChannels.Add(serializedChannel); | |
} | |
var serializableState = new SerializableState { | |
IsCompleted = IsCompleted, | |
Channels = serializedChannels | |
}; | |
_persistentState.State = serializableState; | |
return _persistentState.WriteStateAsync(); | |
} | |
public ValueTask<int> SubscriberCount() => ValueTask.FromResult(SubscriptionQueues.Count); | |
public ValueTask UnsubscribeAsync(SubscriptionHandle subscriptionHandle) | |
{ | |
Guard.IsTrue(SubscriptionQueues.Remove(subscriptionHandle)); | |
return ValueTask.CompletedTask; | |
} | |
public async ValueTask<T> ReadNextEventAsync(SubscriptionHandle handle, GrainCancellationToken cancellationToken) | |
{ | |
if (!SubscriptionQueues.TryGetValue(handle, out var subscription)) | |
throw new InvalidOperationException("Subscription not found"); | |
Guard.IsFalse(subscription.HasActiveListener); | |
try | |
{ | |
subscription.HasActiveListener = true; | |
var result = await subscription.Channel.Reader.ReadAsync(cancellationToken.CancellationToken); | |
return result; | |
} | |
finally | |
{ | |
subscription.HasActiveListener = false; | |
} | |
} | |
public async IAsyncEnumerable<T> ReadManyEventsAsync(SubscriptionHandle handle, GrainCancellationToken cancellationToken) | |
{ | |
if (!SubscriptionQueues.TryGetValue(handle, out var subscription)) | |
throw new InvalidOperationException("Subscription not found"); | |
Guard.IsFalse(subscription.HasActiveListener); | |
try | |
{ | |
subscription.HasActiveListener = true; | |
await foreach (var message in subscription.Channel.Reader.ReadAllAsync(cancellationToken.CancellationToken)) | |
yield return message; | |
} | |
finally | |
{ | |
subscription.HasActiveListener = false; | |
} | |
} | |
public async ValueTask BroadcastToSubscribers(T message) | |
{ | |
foreach (var subscriber in SubscriptionQueues.Values) | |
await subscriber.Channel.Writer.WriteAsync(message); | |
} | |
public ValueTask CompleteAsync() | |
{ | |
IsCompleted = true; | |
foreach (var subscriber in SubscriptionQueues.Values) | |
subscriber.Channel.Writer.Complete(); | |
return ValueTask.CompletedTask; | |
} | |
[GenerateSerializer] | |
public sealed class SerializableState | |
{ | |
[Id(0)] | |
public required List<SerializedChannel> Channels { get; init; } | |
[Id(1)] | |
public required bool IsCompleted { get; set; } | |
} | |
[GenerateSerializer] | |
public sealed class SerializedChannel | |
{ | |
[Id(0)] | |
public required int MaxChannelSize { get; init; } | |
[Id(1)] | |
public required BoundedChannelFullMode ChannelFullMode { get; init; } | |
[Id(2)] | |
public required SubscriptionHandle Handle { get; init; } | |
[Id(3)] | |
public required Queue<T> Values { get; init; } | |
} | |
private sealed class SubscriptionQueue | |
{ | |
public required SubscriptionHandle Handle { get; init; } | |
public required Channel<T> Channel { get; init; } | |
public required BoundedChannelOptions Options { get; init; } | |
public bool HasActiveListener { get; set; } | |
} | |
} | |
[GenerateSerializer] | |
public readonly record struct SubscriptionHandle(GrainId Grain, Guid Handle); | |
[GenerateSerializer] | |
public sealed record SubscriptionOptions | |
{ | |
public required int MaxChannelSize { get; init; } | |
public required BoundedChannelFullMode ChannelFullMode { get; init; } | |
} | |
public interface IMessageChannelGrain<T> : IGrainWithStringKey | |
{ | |
[AlwaysInterleave] | |
public ValueTask<SubscriptionHandle> SubscribeAsync(SubscriptionOptions subscriptionOptions); | |
[AlwaysInterleave] | |
public ValueTask UnsubscribeAsync(SubscriptionHandle subscriptionHandle); | |
[AlwaysInterleave] | |
public ValueTask BroadcastToSubscribers(T message); | |
[AlwaysInterleave] | |
public ValueTask CompleteAsync(); | |
[AlwaysInterleave] | |
public ValueTask<int> SubscriberCount(); | |
[AlwaysInterleave] | |
[ResponseTimeout("01:00:00")] | |
public IAsyncEnumerable<T> ReadManyEventsAsync(SubscriptionHandle handle, GrainCancellationToken cancellationToken); | |
[AlwaysInterleave] | |
[ResponseTimeout("01:00:00")] | |
public ValueTask<T> ReadNextEventAsync(SubscriptionHandle handle, GrainCancellationToken cancellationToken); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment