Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Created December 14, 2011 02:53
Show Gist options
  • Select an option

  • Save anaisbetts/1475041 to your computer and use it in GitHub Desktop.

Select an option

Save anaisbetts/1475041 to your computer and use it in GitHub Desktop.
public class ServiceAnnounceServerPolicy : IServiceAnnounce, IEnableLogger
{
public Dictionary<string, string> RegisteredServiceList { get; protected set; }
public Dictionary<string, List<string>> ServiceList { get; protected set; }
readonly IObserver<ServiceSocketMessage> publishSocket;
readonly Subject<Tuple<string, string>> serviceAdded = new Subject<Tuple<string, string>>();
readonly Subject<Tuple<string, string>> serviceRemoved = new Subject<Tuple<string, string>>();
IDisposable _inner;
public ServiceAnnounceServerPolicy(
IObservable<ServiceSocketMessage> controlSocketIncoming,
IObserver<ServiceSocketMessage> controlSocketOutgoing,
IObserver<ServiceSocketMessage> publishSocket)
{
RegisteredServiceList = new Dictionary<string, string>();
ServiceList = new Dictionary<string, List<string>>();
this.publishSocket = publishSocket;
_inner = controlSocketIncoming.Subscribe(x =>
{
try
{
HandleIncomingMessage(x, controlSocketOutgoing, publishSocket);
}
catch(Exception ex)
{
this.Log().Error("Bad Service Announce client message", ex);
controlSocketOutgoing.OnNext(new ServiceSocketMessage() {Method = "Exception", Data = x.Data});
}
});
}
public IObservable<IDisposable> RegisterService(string serviceName, string zeroMqAddress)
{
lock (RegisteredServiceList) { RegisteredServiceList[serviceName] = zeroMqAddress; }
ServiceList.AddToServiceList(serviceName, zeroMqAddress);
BroadcastService(serviceName, zeroMqAddress, publishSocket, true, true);
return Observable.Return(Disposable.Create(() =>
{
lock (RegisteredServiceList) { RegisteredServiceList.Remove(serviceName); }
ServiceList.RemoveFromServiceList(serviceName, zeroMqAddress);
BroadcastService(serviceName, zeroMqAddress, publishSocket, false, true);
}));
}
public IObservable<Tuple<string, string>> WatchServicesAdded(bool forceRefresh)
{
return serviceAdded;
}
public IObservable<Tuple<string, string>> WatchServicesRemoved()
{
return serviceRemoved;
}
public Tuple<string, string>[] GetAvailableServices()
{
Tuple<string, string>[] ret;
lock(ServiceList)
{
ret = ServiceList.Keys
.SelectMany(y => ServiceList[y].Select(z => new Tuple<string, string>(y,z)))
.ToArray();
}
return ret;
}
public void Dispose()
{
var inner = Interlocked.Exchange(ref _inner, null);
if (inner != null)
{
inner.Dispose();
}
}
void HandleIncomingMessage(ServiceSocketMessage message, IObserver<ServiceSocketMessage> controlSocketOutgoing, IObserver<ServiceSocketMessage> publishSocket)
{
KeyValuePair<string, string> kvp;
switch (message.Method)
{
case "RegisterService":
kvp = JsonConvert.DeserializeObject<KeyValuePair<string, string>>(message.Data);
ServiceList.AddToServiceList(kvp.Key, kvp.Value);
controlSocketOutgoing.OnNext(message);
BroadcastService(kvp.Key, kvp.Value, publishSocket, true, true);
break;
case "UnregisterService":
kvp = JsonConvert.DeserializeObject<KeyValuePair<string, string>>(message.Data);
ServiceList.RemoveFromServiceList(kvp.Key, kvp.Value);
controlSocketOutgoing.OnNext(message);
BroadcastService(kvp.Key, kvp.Value, publishSocket, false, true);
break;
case "UnregisterAll":
List<string> toRemove;
lock (ServiceList)
{
toRemove = ServiceList[message.Data];
ServiceList.Remove(message.Data);
}
controlSocketOutgoing.OnNext(message);
toRemove.Each(y => BroadcastService(message.Data, y, publishSocket, false, true));
break;
case "RequestRefresh":
controlSocketOutgoing.OnNext(message);
BroadcastClearCache(publishSocket);
KeyValuePair<string, string>[] toBroadcast;
lock(ServiceList)
{
toBroadcast = ServiceList.Keys.SelectMany(y => ServiceList[y].Select(z => new KeyValuePair<string, string>(y,z))).ToArray();
}
toBroadcast.Each(y => BroadcastService(y.Key, y.Value, publishSocket, true, false));
break;
default:
throw new Exception("Invalid method");
}
}
void BroadcastClearCache(IObserver<ServiceSocketMessage> publishSocket)
{
publishSocket.OnNext(new ServiceSocketMessage() {Method = "ClearCache", Data = null});
}
void BroadcastService(string serviceName, string address, IObserver<ServiceSocketMessage> publishSocket, bool isRegistering, bool shouldNotify)
{
var method = isRegistering ? "RegisterService" : "UnregisterService";
var data = JsonConvert.SerializeObject(new KeyValuePair<string, string>(serviceName, address));
publishSocket.OnNext(new ServiceSocketMessage() { Method = method, Data = data });
if (shouldNotify)
{
(isRegistering ? serviceAdded : serviceRemoved).OnNext(new Tuple<string, string>(serviceName, address));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment