Skip to content

Instantly share code, notes, and snippets.

@skoon
Created November 12, 2010 05:44
Show Gist options
  • Save skoon/673773 to your computer and use it in GitHub Desktop.
Save skoon/673773 to your computer and use it in GitHub Desktop.
// I'm implementing a reusable library for the Scheduler-Agent-Supervisor pattern blogged about by Clemens Vaster's here:
// http://vasters.com/clemensv/2010/09/28/Cloud+Architecture+The+SchedulerAgentSupervisor+Pattern.aspx
// I'm starting my work on the communication between the Scheduler and the Agent's. This will be (for now) done via
// WCF and leverage MSMQ for elasticity. The Scheduler has a request queue and a response queue. This is the first
// draft of the Scheduler class which implements both queue WCF endpoints as well as some helper functions for client creation.
// Because of Task<T> this helps facilitate the DataContractResolver away from the user of the library.
using System;
using System.Messaging;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.ServiceModel.Description;
using System.Runtime.Serialization;
namespace Elasticity
{
#region Junk declared so I can compile and refactor
public interface ISchedulerResponseQueue
{
}
public interface ISchedulerRequestQueue
{
}
public class Job
{
public Job()
{
}
}
public interface ITask
{
}
public class TaskDataContractResolver : DataContractResolver
{
public TaskDataContractResolver()
{
}
public override Type ResolveName(string typeName, string typeNamespace, Type declaredType, DataContractResolver knownTypeResolver)
{
throw new NotImplementedException();
}
public override bool TryResolveType(Type type, Type declaredType, DataContractResolver knownTypeResolver, out System.Xml.XmlDictionaryString typeName, out System.Xml.XmlDictionaryString typeNamespace)
{
throw new NotImplementedException();
}
}
#endregion
#region New interfaces to be injected
public interface IServiceHostWrapper
{
ServiceDescription Description { get; set; }
}
public interface ISchedulerConfig
{
string RequestName { get; set; }
string ResponseName { get; set; }
}
#endregion
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
public abstract class Scheduler : ISchedulerRequestQueue, ISchedulerResponseQueue, Scheduler_refactor.IScheduler
{
private ISchedulerConfig Config;
private ServiceHost serviceHost = null;
public Scheduler(IServiceHostWrapper serviceHostWrapper, ISchedulerConfig config)
{
Config = config;
// Attach the DataContractResolver which helps serialize/deserialize Task<T> to the endpoints.
AttachDataContractResolver(serviceHostWrapper.Description.Endpoints[0]);
AttachDataContractResolver(serviceHostWrapper.Description.Endpoints[1]);
}
public void Start()
{
CreateQueues(Config.RequestName);
CreateQueues(Config.ResponseName);
serviceHost.Open();
}
public void Stop()
{
serviceHost.Close();
}
private void CreateQueues(string queueName)
{
if (string.IsNullOrEmpty(queueName) == false && MessageQueue.Exists(queueName) == false)
MessageQueue.Create(queueName, true);
}
public virtual void SubmitRequest(Job job)
{
}
public virtual void SubmitResponse(ITask task)
{
}
public void AttachDataContractResolver(ServiceEndpoint endpoint)
{
ContractDescription cd = endpoint.Contract;
foreach (OperationDescription operationDescription in cd.Operations)
{
DataContractSerializerOperationBehavior serializerBehavior =
operationDescription.Behaviors.Find<DataContractSerializerOperationBehavior>();
if (serializerBehavior == null)
{
serializerBehavior = new DataContractSerializerOperationBehavior(operationDescription);
operationDescription.Behaviors.Add(serializerBehavior);
}
serializerBehavior.DataContractResolver = new TaskDataContractResolver();
}
}
public EndpointAddress GetDefaultEndpointAddress(string name)
{
EndpointAddress address = null;
if (string.IsNullOrEmpty(name))
{
string endpointAddress = name;
endpointAddress = endpointAddress.Replace(".", "").Replace("$", "").Replace('\\', '/');
endpointAddress = string.Format("net.msmq://localhost{0}", endpointAddress);
address = new EndpointAddress(endpointAddress);
}
else
{
address = new EndpointAddress(name);
}
return address;
}
#region Request Client Creation
public ISchedulerRequestQueue CreateSchedulerRequestQueueClient()
{
return CreateSchedulerRequestQueueClient(null);
}
public ISchedulerRequestQueue CreateSchedulerRequestQueueClient(Binding binding)
{
EndpointAddress address = GetDefaultEndpointAddress(Config.RequestName);
if (binding == null)
binding = new NetMsmqBinding(NetMsmqSecurityMode.None);
ISchedulerRequestQueue service = null;
using (ChannelFactory<ISchedulerRequestQueue> factory = new ChannelFactory<ISchedulerRequestQueue>(binding, address))
{
service = factory.CreateChannel();
ContractDescription cd = factory.Endpoint.Contract;
// TODO: We should only attach a TaskDataContractResolver for operations that actually require it.
foreach (OperationDescription operationDescription in cd.Operations)
{
DataContractSerializerOperationBehavior serializerBehavior =
operationDescription.Behaviors.Find<DataContractSerializerOperationBehavior>();
if (serializerBehavior == null)
{
serializerBehavior = new DataContractSerializerOperationBehavior(operationDescription);
operationDescription.Behaviors.Add(serializerBehavior);
}
serializerBehavior.DataContractResolver = new TaskDataContractResolver();
}
}
return service;
}
#endregion
#region Response Client Creation
// ISchedulerRequestQueue Implementation
public ISchedulerResponseQueue CreateSchedulerResponseQueueClient()
{
return CreateSchedulerResponseQueueClient(null);
}
public ISchedulerResponseQueue CreateSchedulerResponseQueueClient(Binding binding)
{
EndpointAddress address = GetDefaultEndpointAddress(Config.ResponseName);
if (binding == null)
binding = new NetMsmqBinding(NetMsmqSecurityMode.None);
ISchedulerResponseQueue service = null;
using (ChannelFactory<ISchedulerResponseQueue> factory = new ChannelFactory<ISchedulerResponseQueue>(binding, address))
{
service = factory.CreateChannel();
ContractDescription cd = factory.Endpoint.Contract;
// TODO: We should only attach a TaskDataContractResolver for operations that actually require it.
foreach (OperationDescription operationDescription in cd.Operations)
{
DataContractSerializerOperationBehavior serializerBehavior =
operationDescription.Behaviors.Find<DataContractSerializerOperationBehavior>();
if (serializerBehavior == null)
{
serializerBehavior = new DataContractSerializerOperationBehavior(operationDescription);
operationDescription.Behaviors.Add(serializerBehavior);
}
serializerBehavior.DataContractResolver = new TaskDataContractResolver();
}
}
return service;
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment