Last active
August 29, 2015 14:04
-
-
Save dealproc/1283c130f1923795c7c1 to your computer and use it in GitHub Desktop.
Healing SignalR Connection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace {YourNamespace}.ServiceBus.Tasks { | |
using System; | |
using System.Net.Http; | |
using System.Net.Http.Headers; | |
using System.Threading.Tasks; | |
public abstract class BaseSyncTask : ISyncTask { | |
protected NLog.Logger Log = NLog.LogManager.GetCurrentClassLogger(); | |
public abstract int Priority { get; } | |
public abstract Task Synchronize(StoreConfiguration config, string accessToken); | |
protected HttpClient GenerateClient(StoreConfiguration config, string accessToken) { | |
Log.Debug("Generating client..."); | |
var client = new HttpClient { | |
BaseAddress = new Uri(config.ConnectUrl) | |
}; | |
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", accessToken); | |
return client; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace {YourNamespace}.Host.Proxies { | |
using Autofac; | |
using Microsoft.AspNet.SignalR; | |
using Microsoft.AspNet.SignalR.Client; | |
using Microsoft.AspNet.SignalR.Hubs; | |
using System; | |
using System.Collections.Generic; | |
using System.ComponentModel; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Thinktecture.IdentityModel.Client; | |
public class ConnectProxy : INotifyPropertyChanged | |
, IHandle<ResetHQConnectionEvent> | |
, IDisposable { | |
private readonly static Lazy<ConnectProxy> _Instance = new Lazy<ConnectProxy>(() => | |
new ConnectProxy( | |
Infrastructure.IoC.AutofacConfig.ContainerInstance.Resolve<ILifetimeScope>(), | |
Infrastructure.IoC.AutofacConfig.ContainerInstance.Resolve<IEventAggregator>(), | |
Infrastructure.IoC.AutofacConfig.ContainerInstance.Resolve<IEnumerable<ISyncTask>>(), | |
GlobalHost.ConnectionManager.GetHubContext<Hubs.DashboardHub>().Clients, | |
GlobalHost.ConnectionManager.GetHubContext<Hubs.WorkstationHub>().Clients, | |
Infrastructure.IoC.AutofacConfig.ContainerInstance.Resolve<IStoreConfigurationRepository>(), | |
Infrastructure.IoC.AutofacConfig.ContainerInstance.Resolve<IEnumerable<IHQProxyHandler>>() | |
) | |
); | |
public static ConnectProxy Instance { | |
get { return _Instance.Value; } | |
} | |
// Constructor injected members. | |
ILifetimeScope _LifetimeScope; | |
IEventAggregator _EventAggregator; | |
IStoreConfigurationRepository _StoreConfigurationRepository; | |
IHubConnectionContext _DashboardClients; | |
IHubConnectionContext _WorkstationClients; | |
IEnumerable<ISyncTask> _SynchronizationTasks; | |
IEnumerable<IHQProxyHandler> _HQProxyHandlers; | |
// Internal members. | |
bool _IsDisposed = false; | |
Timer _HealingTimer; | |
HubConnection _CloudConnection; | |
IHubProxy _{Service}HubProxy; | |
string _AccessToken; | |
StoreConfiguration _ActiveConfiguration; | |
NLog.Logger Log = NLog.LogManager.GetCurrentClassLogger(); | |
//Exposed Members | |
bool _ConnectionEstablished = false; | |
bool _SystemIsAvailable = false; | |
public bool ConnectionEstablished { | |
get { return _ConnectionEstablished; } | |
set { | |
_ConnectionEstablished = value; | |
FirePropertyChanged("ConnectionEstablished"); | |
} | |
} | |
public bool SystemIsAvailable { | |
get { return _SystemIsAvailable; } | |
set { | |
_SystemIsAvailable = value; | |
FirePropertyChanged("SystemIsAvailable"); | |
} | |
} | |
public ConnectProxy( | |
ILifetimeScope lifetimeScope | |
, IEventAggregator eventAggregator | |
, IEnumerable<ISyncTask> synchronizationTasks | |
, IHubConnectionContext dashboardClients | |
, IHubConnectionContext workstationClients | |
, IStoreConfigurationRepository storeConfigurationRepository | |
, IEnumerable<IHQProxyHandler> hqProxyHandlers) { | |
_LifetimeScope = lifetimeScope; | |
_DashboardClients = dashboardClients; | |
_WorkstationClients = workstationClients; | |
_SynchronizationTasks = synchronizationTasks; | |
_StoreConfigurationRepository = storeConfigurationRepository; | |
_EventAggregator = eventAggregator; | |
_HQProxyHandlers = hqProxyHandlers; | |
_EventAggregator.Subscribe(this); | |
} | |
/// <summary> | |
/// Establishes the connection to Connect, so the store can receive near realtime updates. | |
/// </summary> | |
public void Connect() { | |
Disconnect(); | |
_AccessToken = string.Empty; | |
_ActiveConfiguration = _StoreConfigurationRepository.GetFirst(x => x.IsActive); | |
if (_ActiveConfiguration != null) { | |
try { | |
// test to see if we have a refresh token. if we don't, then exit routine. | |
if (string.IsNullOrWhiteSpace(_ActiveConfiguration.RefreshToken)) { | |
SetConnectionState("notconfigured"); | |
return; | |
}; | |
// Get an Access Token. | |
var client = new OAuth2Client(BuildUri(_ActiveConfiguration.AuthorizationUrl), "xxx", "xxx"); | |
Log.Debug("Client built."); | |
var tokenResponse = client.RequestRefreshTokenAsync(_ActiveConfiguration.RefreshToken).Result; | |
_AccessToken = tokenResponse.AccessToken; | |
Log.Debug("Token received: " + _AccessToken); | |
// build a dictionary to supply the access token to SignalR | |
var tokenDictionary = new Dictionary<string, string>{ | |
{ "access_token", _AccessToken} | |
}; | |
// Connect to the configured "connect" server. | |
_CloudConnection = new HubConnection(_ActiveConfiguration.ConnectUrl, tokenDictionary); | |
_CloudConnection.StateChanged += (chg) => { | |
//TODO: Update the web ui with the connection status. | |
Log.Debug("State Changed to: " + chg.NewState.ToString()); | |
SetConnectionState(); | |
switch (chg.NewState) { | |
case ConnectionState.Connected: | |
if (!ConnectionEstablished) { | |
try { | |
foreach (var task in _SynchronizationTasks.OrderBy(x => x.Priority)) { | |
task.Synchronize(_ActiveConfiguration, _AccessToken).Wait(); | |
} | |
ConnectionEstablished = true; | |
} catch (HQSyncUnavailableException ex) { | |
Log.Info(ex); | |
} catch (Exception ex) { | |
Log.Fatal(ex); | |
} | |
SystemIsAvailable = true; | |
} | |
break; | |
case ConnectionState.Connecting: | |
case ConnectionState.Disconnected: | |
case ConnectionState.Reconnecting: | |
ConnectionEstablished = false; | |
break; | |
} | |
}; | |
_CloudConnection.ConnectionSlow += () => { | |
//TODO: Update the web ui with a flag stating the connection is slow. | |
Log.Debug("SignalR is reporting a slow connection."); | |
SetConnectionSlow(); | |
}; | |
_{Service}HubProxy = _CloudConnection.CreateHubProxy("{YourHubName}"); | |
// build up actions from Connect commands. | |
foreach (var handler in _HQProxyHandlers) { | |
handler.RegisterEvents(_WorkstationClients, _DashboardClients, _{Service}HubProxy); | |
} | |
_CloudConnection.Start().ContinueWith(connectTask => { | |
Log.Debug("HubConnection has started... current state is: " + _CloudConnection.State.ToString()); | |
switch (connectTask.Status) { | |
case TaskStatus.Canceled: | |
ConnectionEstablished = false; | |
Log.Info("User cancelled connection... this should not happen, should it?"); | |
break; | |
case TaskStatus.Faulted: | |
ConnectionEstablished = false; | |
//TODO: Implement a method to get the Web UI to display this failed. | |
_HealingTimer = new Timer(HealConnection, null, TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1)); | |
Log.Debug("Could not connect... Unauthorized maybe?"); | |
break; | |
} | |
}); | |
} catch (Exception ex) { | |
Log.Debug(ex); | |
Log.Debug(ex.Message); | |
ConnectionEstablished = false; | |
return; | |
} | |
} else { | |
ConnectionEstablished = false; | |
} | |
} | |
/// <summary> | |
/// Cleans up the connection to Connect, preps to re-connect. | |
/// </summary> | |
public void Disconnect() { | |
Log.Debug("Disconnecting from 'CloudService'"); | |
SystemIsAvailable = false; | |
_AccessToken = string.Empty; | |
if (_{Service}HubProxy != null) { | |
_{Service}HubProxy = null; | |
} | |
if (_CloudConnection != null) { | |
try { | |
_CloudConnection.Dispose(); | |
} catch (Exception ex) { | |
Log.Debug(ex); | |
}; | |
_CloudConnection = null; | |
} | |
} | |
public void HealConnection(object state) { | |
if (!ConnectionEstablished && (_CloudConnection == null || _CloudConnection.State != ConnectionState.Connected)) { | |
Log.Debug("Healing Connection..."); | |
Connect(); | |
} | |
} | |
public void Dispose() { | |
Dispose(_IsDisposed); | |
} | |
void Dispose(bool isDisposed) { | |
if (!isDisposed) { | |
Disconnect(); | |
} | |
_IsDisposed = true; | |
} | |
public void SetConnectionSlow() { | |
_DashboardClients.All.SetConnectionSlow(); | |
} | |
public void ClearConnectionSlow() { | |
_DashboardClients.All.ClearConnectionSlow(); | |
} | |
public void SetConnectionState(string state) { | |
_DashboardClients.All.SetConnectionState(state.ToLower()); | |
} | |
public void SetConnectionState() { | |
if (_CloudConnection != null) { | |
_DashboardClients.All.SetConnectionState(_CloudConnection.State.ToString().ToLower()); | |
} else { | |
var config = _StoreConfigurationRepository.GetFirst(x => x.IsActive); | |
if (config == null || string.IsNullOrWhiteSpace(config.RefreshToken)) { | |
_DashboardClients.All.SetConnectionState("notconfigured"); | |
} else { | |
_DashboardClients.All.SetConnectionState("connecting"); | |
} | |
} | |
} | |
public void SetConnectionState(object state) { | |
SetConnectionState(); | |
} | |
public event PropertyChangedEventHandler PropertyChanged; | |
void FirePropertyChanged(string p) { | |
var h = PropertyChanged; | |
if (h != null) { | |
h(this, new PropertyChangedEventArgs(p)); | |
} | |
} | |
Uri BuildUri(string baseUrl) { | |
var url = baseUrl; | |
Log.Debug(string.Format("Base URL: {0}", url)); | |
if (url.EndsWith("/")) { | |
url = url.Substring(0, url.Length - 1); | |
} | |
url += "/{YourProductHeaderHere}/oauth/token"; | |
Log.Debug("Full Url: " + url); | |
return new Uri(url); | |
} | |
public void Handle(ResetHQConnectionEvent message) { | |
Disconnect(); | |
_HealingTimer = new Timer(HealConnection, null, TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1)); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace {YourNamespace}.ServiceBus.ProxyHandlers { | |
using Autofac; | |
using Microsoft.AspNet.SignalR.Client; | |
using Microsoft.AspNet.SignalR.Hubs; | |
using System; | |
public abstract class DomainModelProxyHandler<TEntity, TModel> | |
: IHQProxyHandler | |
where TEntity : HQDataModelBase, new() | |
where TModel : DomainModelBase { | |
ILifetimeScope _LifetimeScope; | |
string _EventName; | |
IHubConnectionContext _WorkstationConnections; | |
IHubConnectionContext _DashboardConnections; | |
public DomainModelProxyHandler(ILifetimeScope lifetimeScope, string eventName) { | |
_LifetimeScope = lifetimeScope; | |
_EventName = eventName; | |
} | |
public void RegisterEvents(IHubConnectionContext workstationConnections, IHubConnectionContext dashboardConnections, IHubProxy proxy) { | |
_WorkstationConnections = workstationConnections; | |
_DashboardConnections = dashboardConnections; | |
proxy.On<TModel, string>(_EventName, (model, action) => { | |
var actionType = ParseAction(action); | |
TModel wModel = default(TModel); | |
TEntity entity = default(TEntity); | |
using (var scope = _LifetimeScope.BeginLifetimeScope()) { | |
var repo = scope.Resolve<IRepository<TEntity>>(); | |
var builder = scope.Resolve<IBuilder<TEntity, TModel>>(); | |
switch (actionType) { | |
case ModelAction.Added: | |
case ModelAction.Updated: | |
entity = builder.BuildEntity(model, true); | |
var savedEntity = repo.SaveOrUpdate(entity); | |
wModel = builder.BuildViewModel(savedEntity); | |
break; | |
case ModelAction.Removed: | |
entity = repo.GetFirst(x => x.GUID == model.GUID); | |
if (entity == null) { | |
return; | |
} | |
wModel = builder.BuildViewModel(entity); | |
repo.Delete(entity); | |
break; | |
} | |
// inspired from: http://stackoverflow.com/questions/11042399/execute-method-on-dynamic | |
IClientProxy meth = _WorkstationConnections.All; | |
meth.Invoke(_EventName, wModel, action); | |
} | |
_DashboardConnections.All.ObjectUpdated(string.Format("{0} has been {1} at {2}", entity.GetType().Name, action, DateTimeOffset.UtcNow)); | |
}); | |
} | |
protected ModelAction ParseAction(string actionName) { | |
switch (actionName.ToLower()) { | |
case "added": | |
return ModelAction.Added; | |
case "updated": | |
return ModelAction.Updated; | |
case "removed": | |
return ModelAction.Removed; | |
default: | |
return ModelAction.Updated; | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace {YourNamespace}.ServiceBus.Interfaces { | |
using Microsoft.AspNet.SignalR.Client; | |
using Microsoft.AspNet.SignalR.Hubs; | |
public interface IHQProxyHandler { | |
void RegisterEvents(IHubConnectionContext workstationConnections, IHubConnectionContext dashboardConnections, IHubProxy proxy); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Looked at this again.
Any chance you could distill this a bit further to the core principle of "healing signalr connection"? I am interested in re-creating event handlers when you have to dispose and re-create the whole HubConnection.
It'd be interesting to have some integration testing on this functionality: