Skip to content

Instantly share code, notes, and snippets.

@usausa
Last active September 27, 2025 10:47
Show Gist options
  • Save usausa/bd6ecc719bbc2c4e3f09b7469d337294 to your computer and use it in GitHub Desktop.
Save usausa/bd6ecc719bbc2c4e3f09b7469d337294 to your computer and use it in GitHub Desktop.
SignalR reactive extension
public static class ReactiveSignalR
{
public static IObservable<T> CreateObservable<T>(string uri, string methodName, TimeSpan retryInterval)
{
return Observable.Create<T>(observer =>
{
var connection = new HubConnectionBuilder()
.WithUrl(uri)
.WithAutomaticReconnect(new FixedIntervalRetryPolicy(retryInterval))
.Build();
connection.On<T>(methodName, observer.OnNext);
var disposable = new CompositeDisposable();
var cts = new CancellationTokenSource();
disposable.Add(Disposable.Create(() => cts.Cancel()));
disposable.Add(Observable.FromAsync(async () => await TryConnectWithRetryAsync(connection, retryInterval, cts.Token)).Subscribe());
disposable.Add(Disposable.Create(async () =>
{
if (connection.State == HubConnectionState.Connected)
{
await connection.StopAsync();
}
await connection.DisposeAsync();
}));
return disposable;
});
}
private static async Task<bool> TryConnectWithRetryAsync(HubConnection connection, TimeSpan retryInterval, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await connection.StartAsync(cancellationToken);
return true;
}
catch (OperationCanceledException)
{
return false;
}
catch
{
if (cancellationToken.IsCancellationRequested)
{
return false;
}
try
{
await Task.Delay(retryInterval, cancellationToken);
}
catch (OperationCanceledException)
{
return false;
}
}
}
return false;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment