Created
December 14, 2011 02:53
-
-
Save anaisbetts/1475041 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class ServiceAnnounceServerPolicy : IServiceAnnounce, IEnableLogger | |
| { | |
| public Dictionary<string, string> RegisteredServiceList { get; protected set; } | |
| public Dictionary<string, List<string>> ServiceList { get; protected set; } | |
| readonly IObserver<ServiceSocketMessage> publishSocket; | |
| readonly Subject<Tuple<string, string>> serviceAdded = new Subject<Tuple<string, string>>(); | |
| readonly Subject<Tuple<string, string>> serviceRemoved = new Subject<Tuple<string, string>>(); | |
| IDisposable _inner; | |
| public ServiceAnnounceServerPolicy( | |
| IObservable<ServiceSocketMessage> controlSocketIncoming, | |
| IObserver<ServiceSocketMessage> controlSocketOutgoing, | |
| IObserver<ServiceSocketMessage> publishSocket) | |
| { | |
| RegisteredServiceList = new Dictionary<string, string>(); | |
| ServiceList = new Dictionary<string, List<string>>(); | |
| this.publishSocket = publishSocket; | |
| _inner = controlSocketIncoming.Subscribe(x => | |
| { | |
| try | |
| { | |
| HandleIncomingMessage(x, controlSocketOutgoing, publishSocket); | |
| } | |
| catch(Exception ex) | |
| { | |
| this.Log().Error("Bad Service Announce client message", ex); | |
| controlSocketOutgoing.OnNext(new ServiceSocketMessage() {Method = "Exception", Data = x.Data}); | |
| } | |
| }); | |
| } | |
| public IObservable<IDisposable> RegisterService(string serviceName, string zeroMqAddress) | |
| { | |
| lock (RegisteredServiceList) { RegisteredServiceList[serviceName] = zeroMqAddress; } | |
| ServiceList.AddToServiceList(serviceName, zeroMqAddress); | |
| BroadcastService(serviceName, zeroMqAddress, publishSocket, true, true); | |
| return Observable.Return(Disposable.Create(() => | |
| { | |
| lock (RegisteredServiceList) { RegisteredServiceList.Remove(serviceName); } | |
| ServiceList.RemoveFromServiceList(serviceName, zeroMqAddress); | |
| BroadcastService(serviceName, zeroMqAddress, publishSocket, false, true); | |
| })); | |
| } | |
| public IObservable<Tuple<string, string>> WatchServicesAdded(bool forceRefresh) | |
| { | |
| return serviceAdded; | |
| } | |
| public IObservable<Tuple<string, string>> WatchServicesRemoved() | |
| { | |
| return serviceRemoved; | |
| } | |
| public Tuple<string, string>[] GetAvailableServices() | |
| { | |
| Tuple<string, string>[] ret; | |
| lock(ServiceList) | |
| { | |
| ret = ServiceList.Keys | |
| .SelectMany(y => ServiceList[y].Select(z => new Tuple<string, string>(y,z))) | |
| .ToArray(); | |
| } | |
| return ret; | |
| } | |
| public void Dispose() | |
| { | |
| var inner = Interlocked.Exchange(ref _inner, null); | |
| if (inner != null) | |
| { | |
| inner.Dispose(); | |
| } | |
| } | |
| void HandleIncomingMessage(ServiceSocketMessage message, IObserver<ServiceSocketMessage> controlSocketOutgoing, IObserver<ServiceSocketMessage> publishSocket) | |
| { | |
| KeyValuePair<string, string> kvp; | |
| switch (message.Method) | |
| { | |
| case "RegisterService": | |
| kvp = JsonConvert.DeserializeObject<KeyValuePair<string, string>>(message.Data); | |
| ServiceList.AddToServiceList(kvp.Key, kvp.Value); | |
| controlSocketOutgoing.OnNext(message); | |
| BroadcastService(kvp.Key, kvp.Value, publishSocket, true, true); | |
| break; | |
| case "UnregisterService": | |
| kvp = JsonConvert.DeserializeObject<KeyValuePair<string, string>>(message.Data); | |
| ServiceList.RemoveFromServiceList(kvp.Key, kvp.Value); | |
| controlSocketOutgoing.OnNext(message); | |
| BroadcastService(kvp.Key, kvp.Value, publishSocket, false, true); | |
| break; | |
| case "UnregisterAll": | |
| List<string> toRemove; | |
| lock (ServiceList) | |
| { | |
| toRemove = ServiceList[message.Data]; | |
| ServiceList.Remove(message.Data); | |
| } | |
| controlSocketOutgoing.OnNext(message); | |
| toRemove.Each(y => BroadcastService(message.Data, y, publishSocket, false, true)); | |
| break; | |
| case "RequestRefresh": | |
| controlSocketOutgoing.OnNext(message); | |
| BroadcastClearCache(publishSocket); | |
| KeyValuePair<string, string>[] toBroadcast; | |
| lock(ServiceList) | |
| { | |
| toBroadcast = ServiceList.Keys.SelectMany(y => ServiceList[y].Select(z => new KeyValuePair<string, string>(y,z))).ToArray(); | |
| } | |
| toBroadcast.Each(y => BroadcastService(y.Key, y.Value, publishSocket, true, false)); | |
| break; | |
| default: | |
| throw new Exception("Invalid method"); | |
| } | |
| } | |
| void BroadcastClearCache(IObserver<ServiceSocketMessage> publishSocket) | |
| { | |
| publishSocket.OnNext(new ServiceSocketMessage() {Method = "ClearCache", Data = null}); | |
| } | |
| void BroadcastService(string serviceName, string address, IObserver<ServiceSocketMessage> publishSocket, bool isRegistering, bool shouldNotify) | |
| { | |
| var method = isRegistering ? "RegisterService" : "UnregisterService"; | |
| var data = JsonConvert.SerializeObject(new KeyValuePair<string, string>(serviceName, address)); | |
| publishSocket.OnNext(new ServiceSocketMessage() { Method = method, Data = data }); | |
| if (shouldNotify) | |
| { | |
| (isRegistering ? serviceAdded : serviceRemoved).OnNext(new Tuple<string, string>(serviceName, address)); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment