Skip to content

Instantly share code, notes, and snippets.

@dealproc
Last active August 29, 2015 14:04
Show Gist options
  • Save dealproc/1283c130f1923795c7c1 to your computer and use it in GitHub Desktop.
Save dealproc/1283c130f1923795c7c1 to your computer and use it in GitHub Desktop.
Healing SignalR Connection
namespace {YourNamespace}.ServiceBus.Tasks {
using System;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
public abstract class BaseSyncTask : ISyncTask {
protected NLog.Logger Log = NLog.LogManager.GetCurrentClassLogger();
public abstract int Priority { get; }
public abstract Task Synchronize(StoreConfiguration config, string accessToken);
protected HttpClient GenerateClient(StoreConfiguration config, string accessToken) {
Log.Debug("Generating client...");
var client = new HttpClient {
BaseAddress = new Uri(config.ConnectUrl)
};
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", accessToken);
return client;
}
}
}
namespace {YourNamespace}.Host.Proxies {
using Autofac;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Client;
using Microsoft.AspNet.SignalR.Hubs;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Thinktecture.IdentityModel.Client;
public class ConnectProxy : INotifyPropertyChanged
, IHandle<ResetHQConnectionEvent>
, IDisposable {
private readonly static Lazy<ConnectProxy> _Instance = new Lazy<ConnectProxy>(() =>
new ConnectProxy(
Infrastructure.IoC.AutofacConfig.ContainerInstance.Resolve<ILifetimeScope>(),
Infrastructure.IoC.AutofacConfig.ContainerInstance.Resolve<IEventAggregator>(),
Infrastructure.IoC.AutofacConfig.ContainerInstance.Resolve<IEnumerable<ISyncTask>>(),
GlobalHost.ConnectionManager.GetHubContext<Hubs.DashboardHub>().Clients,
GlobalHost.ConnectionManager.GetHubContext<Hubs.WorkstationHub>().Clients,
Infrastructure.IoC.AutofacConfig.ContainerInstance.Resolve<IStoreConfigurationRepository>(),
Infrastructure.IoC.AutofacConfig.ContainerInstance.Resolve<IEnumerable<IHQProxyHandler>>()
)
);
public static ConnectProxy Instance {
get { return _Instance.Value; }
}
// Constructor injected members.
ILifetimeScope _LifetimeScope;
IEventAggregator _EventAggregator;
IStoreConfigurationRepository _StoreConfigurationRepository;
IHubConnectionContext _DashboardClients;
IHubConnectionContext _WorkstationClients;
IEnumerable<ISyncTask> _SynchronizationTasks;
IEnumerable<IHQProxyHandler> _HQProxyHandlers;
// Internal members.
bool _IsDisposed = false;
Timer _HealingTimer;
HubConnection _CloudConnection;
IHubProxy _{Service}HubProxy;
string _AccessToken;
StoreConfiguration _ActiveConfiguration;
NLog.Logger Log = NLog.LogManager.GetCurrentClassLogger();
//Exposed Members
bool _ConnectionEstablished = false;
bool _SystemIsAvailable = false;
public bool ConnectionEstablished {
get { return _ConnectionEstablished; }
set {
_ConnectionEstablished = value;
FirePropertyChanged("ConnectionEstablished");
}
}
public bool SystemIsAvailable {
get { return _SystemIsAvailable; }
set {
_SystemIsAvailable = value;
FirePropertyChanged("SystemIsAvailable");
}
}
public ConnectProxy(
ILifetimeScope lifetimeScope
, IEventAggregator eventAggregator
, IEnumerable<ISyncTask> synchronizationTasks
, IHubConnectionContext dashboardClients
, IHubConnectionContext workstationClients
, IStoreConfigurationRepository storeConfigurationRepository
, IEnumerable<IHQProxyHandler> hqProxyHandlers) {
_LifetimeScope = lifetimeScope;
_DashboardClients = dashboardClients;
_WorkstationClients = workstationClients;
_SynchronizationTasks = synchronizationTasks;
_StoreConfigurationRepository = storeConfigurationRepository;
_EventAggregator = eventAggregator;
_HQProxyHandlers = hqProxyHandlers;
_EventAggregator.Subscribe(this);
}
/// <summary>
/// Establishes the connection to Connect, so the store can receive near realtime updates.
/// </summary>
public void Connect() {
Disconnect();
_AccessToken = string.Empty;
_ActiveConfiguration = _StoreConfigurationRepository.GetFirst(x => x.IsActive);
if (_ActiveConfiguration != null) {
try {
// test to see if we have a refresh token. if we don't, then exit routine.
if (string.IsNullOrWhiteSpace(_ActiveConfiguration.RefreshToken)) {
SetConnectionState("notconfigured");
return;
};
// Get an Access Token.
var client = new OAuth2Client(BuildUri(_ActiveConfiguration.AuthorizationUrl), "xxx", "xxx");
Log.Debug("Client built.");
var tokenResponse = client.RequestRefreshTokenAsync(_ActiveConfiguration.RefreshToken).Result;
_AccessToken = tokenResponse.AccessToken;
Log.Debug("Token received: " + _AccessToken);
// build a dictionary to supply the access token to SignalR
var tokenDictionary = new Dictionary<string, string>{
{ "access_token", _AccessToken}
};
// Connect to the configured "connect" server.
_CloudConnection = new HubConnection(_ActiveConfiguration.ConnectUrl, tokenDictionary);
_CloudConnection.StateChanged += (chg) => {
//TODO: Update the web ui with the connection status.
Log.Debug("State Changed to: " + chg.NewState.ToString());
SetConnectionState();
switch (chg.NewState) {
case ConnectionState.Connected:
if (!ConnectionEstablished) {
try {
foreach (var task in _SynchronizationTasks.OrderBy(x => x.Priority)) {
task.Synchronize(_ActiveConfiguration, _AccessToken).Wait();
}
ConnectionEstablished = true;
} catch (HQSyncUnavailableException ex) {
Log.Info(ex);
} catch (Exception ex) {
Log.Fatal(ex);
}
SystemIsAvailable = true;
}
break;
case ConnectionState.Connecting:
case ConnectionState.Disconnected:
case ConnectionState.Reconnecting:
ConnectionEstablished = false;
break;
}
};
_CloudConnection.ConnectionSlow += () => {
//TODO: Update the web ui with a flag stating the connection is slow.
Log.Debug("SignalR is reporting a slow connection.");
SetConnectionSlow();
};
_{Service}HubProxy = _CloudConnection.CreateHubProxy("{YourHubName}");
// build up actions from Connect commands.
foreach (var handler in _HQProxyHandlers) {
handler.RegisterEvents(_WorkstationClients, _DashboardClients, _{Service}HubProxy);
}
_CloudConnection.Start().ContinueWith(connectTask => {
Log.Debug("HubConnection has started... current state is: " + _CloudConnection.State.ToString());
switch (connectTask.Status) {
case TaskStatus.Canceled:
ConnectionEstablished = false;
Log.Info("User cancelled connection... this should not happen, should it?");
break;
case TaskStatus.Faulted:
ConnectionEstablished = false;
//TODO: Implement a method to get the Web UI to display this failed.
_HealingTimer = new Timer(HealConnection, null, TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1));
Log.Debug("Could not connect... Unauthorized maybe?");
break;
}
});
} catch (Exception ex) {
Log.Debug(ex);
Log.Debug(ex.Message);
ConnectionEstablished = false;
return;
}
} else {
ConnectionEstablished = false;
}
}
/// <summary>
/// Cleans up the connection to Connect, preps to re-connect.
/// </summary>
public void Disconnect() {
Log.Debug("Disconnecting from 'CloudService'");
SystemIsAvailable = false;
_AccessToken = string.Empty;
if (_{Service}HubProxy != null) {
_{Service}HubProxy = null;
}
if (_CloudConnection != null) {
try {
_CloudConnection.Dispose();
} catch (Exception ex) {
Log.Debug(ex);
};
_CloudConnection = null;
}
}
public void HealConnection(object state) {
if (!ConnectionEstablished && (_CloudConnection == null || _CloudConnection.State != ConnectionState.Connected)) {
Log.Debug("Healing Connection...");
Connect();
}
}
public void Dispose() {
Dispose(_IsDisposed);
}
void Dispose(bool isDisposed) {
if (!isDisposed) {
Disconnect();
}
_IsDisposed = true;
}
public void SetConnectionSlow() {
_DashboardClients.All.SetConnectionSlow();
}
public void ClearConnectionSlow() {
_DashboardClients.All.ClearConnectionSlow();
}
public void SetConnectionState(string state) {
_DashboardClients.All.SetConnectionState(state.ToLower());
}
public void SetConnectionState() {
if (_CloudConnection != null) {
_DashboardClients.All.SetConnectionState(_CloudConnection.State.ToString().ToLower());
} else {
var config = _StoreConfigurationRepository.GetFirst(x => x.IsActive);
if (config == null || string.IsNullOrWhiteSpace(config.RefreshToken)) {
_DashboardClients.All.SetConnectionState("notconfigured");
} else {
_DashboardClients.All.SetConnectionState("connecting");
}
}
}
public void SetConnectionState(object state) {
SetConnectionState();
}
public event PropertyChangedEventHandler PropertyChanged;
void FirePropertyChanged(string p) {
var h = PropertyChanged;
if (h != null) {
h(this, new PropertyChangedEventArgs(p));
}
}
Uri BuildUri(string baseUrl) {
var url = baseUrl;
Log.Debug(string.Format("Base URL: {0}", url));
if (url.EndsWith("/")) {
url = url.Substring(0, url.Length - 1);
}
url += "/{YourProductHeaderHere}/oauth/token";
Log.Debug("Full Url: " + url);
return new Uri(url);
}
public void Handle(ResetHQConnectionEvent message) {
Disconnect();
_HealingTimer = new Timer(HealConnection, null, TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1));
}
}
}
namespace {YourNamespace}.ServiceBus.ProxyHandlers {
using Autofac;
using Microsoft.AspNet.SignalR.Client;
using Microsoft.AspNet.SignalR.Hubs;
using System;
public abstract class DomainModelProxyHandler<TEntity, TModel>
: IHQProxyHandler
where TEntity : HQDataModelBase, new()
where TModel : DomainModelBase {
ILifetimeScope _LifetimeScope;
string _EventName;
IHubConnectionContext _WorkstationConnections;
IHubConnectionContext _DashboardConnections;
public DomainModelProxyHandler(ILifetimeScope lifetimeScope, string eventName) {
_LifetimeScope = lifetimeScope;
_EventName = eventName;
}
public void RegisterEvents(IHubConnectionContext workstationConnections, IHubConnectionContext dashboardConnections, IHubProxy proxy) {
_WorkstationConnections = workstationConnections;
_DashboardConnections = dashboardConnections;
proxy.On<TModel, string>(_EventName, (model, action) => {
var actionType = ParseAction(action);
TModel wModel = default(TModel);
TEntity entity = default(TEntity);
using (var scope = _LifetimeScope.BeginLifetimeScope()) {
var repo = scope.Resolve<IRepository<TEntity>>();
var builder = scope.Resolve<IBuilder<TEntity, TModel>>();
switch (actionType) {
case ModelAction.Added:
case ModelAction.Updated:
entity = builder.BuildEntity(model, true);
var savedEntity = repo.SaveOrUpdate(entity);
wModel = builder.BuildViewModel(savedEntity);
break;
case ModelAction.Removed:
entity = repo.GetFirst(x => x.GUID == model.GUID);
if (entity == null) {
return;
}
wModel = builder.BuildViewModel(entity);
repo.Delete(entity);
break;
}
// inspired from: http://stackoverflow.com/questions/11042399/execute-method-on-dynamic
IClientProxy meth = _WorkstationConnections.All;
meth.Invoke(_EventName, wModel, action);
}
_DashboardConnections.All.ObjectUpdated(string.Format("{0} has been {1} at {2}", entity.GetType().Name, action, DateTimeOffset.UtcNow));
});
}
protected ModelAction ParseAction(string actionName) {
switch (actionName.ToLower()) {
case "added":
return ModelAction.Added;
case "updated":
return ModelAction.Updated;
case "removed":
return ModelAction.Removed;
default:
return ModelAction.Updated;
}
}
}
}
namespace {YourNamespace}.ServiceBus.Interfaces {
using Microsoft.AspNet.SignalR.Client;
using Microsoft.AspNet.SignalR.Hubs;
public interface IHQProxyHandler {
void RegisterEvents(IHubConnectionContext workstationConnections, IHubConnectionContext dashboardConnections, IHubProxy proxy);
}
}
@killnine
Copy link

killnine commented Aug 5, 2014

Looked at this again.

wut

Any chance you could distill this a bit further to the core principle of "healing signalr connection"? I am interested in re-creating event handlers when you have to dispose and re-create the whole HubConnection.

It'd be interesting to have some integration testing on this functionality:

  • Unable to connect to server initially, make sure it retries until it connects (or whatever you logic is...)
  • Able to make initial connection, but server connection is severed (doesn't SignalR handle this natively?)
  • Other tests...?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment