Last active
February 14, 2017 13:12
-
-
Save micdenny/100f2945785e879919ff951baf1cf40d to your computer and use it in GitHub Desktop.
ChannelPoolingClientCommandDispatcher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using EasyNetQ; | |
using EasyNetQ.Producer; | |
using RabbitMQ.Client; | |
namespace EasyNetQSlowerThanRabbitMQ | |
{ | |
public class ChannelPoolingClientCommandDispatcherFactory : IClientCommandDispatcherFactory | |
{ | |
private readonly ConnectionConfiguration configuration; | |
private readonly IPersistentChannelFactory persistentChannelFactory; | |
public ChannelPoolingClientCommandDispatcherFactory(ConnectionConfiguration configuration, IPersistentChannelFactory persistentChannelFactory) | |
{ | |
this.configuration = configuration; | |
this.persistentChannelFactory = persistentChannelFactory; | |
} | |
public IClientCommandDispatcher GetClientCommandDispatcher(IPersistentConnection connection) | |
{ | |
return new ChannelPoolingClientCommandDispatcher(configuration, connection, persistentChannelFactory); | |
} | |
} | |
public class ChannelPoolingClientCommandDispatcher : IClientCommandDispatcher | |
{ | |
private const int queueSize = 1; | |
private int dispatcherCount = Environment.ProcessorCount * 2; | |
private readonly IPersistentConnection connection; | |
private readonly CancellationTokenSource cancellation = new CancellationTokenSource(); | |
private readonly BlockingCollection<Action> queue = new BlockingCollection<Action>(queueSize); | |
private readonly ObjectPool<IPersistentChannel> _persistentChannels; | |
public ChannelPoolingClientCommandDispatcher( | |
ConnectionConfiguration configuration, | |
IPersistentConnection connection, | |
IPersistentChannelFactory persistentChannelFactory) | |
{ | |
this.connection = connection; | |
_persistentChannels = new ObjectPool<IPersistentChannel>(() => persistentChannelFactory.CreatePersistentChannel(connection)); | |
for (int i = 0; i < dispatcherCount; i++) | |
{ | |
StartDispatcherThread(configuration); | |
} | |
} | |
public T Invoke<T>(Func<IModel, T> channelAction) | |
{ | |
try | |
{ | |
return InvokeAsync(channelAction).Result; | |
} | |
catch (AggregateException e) | |
{ | |
throw e.InnerException; | |
} | |
} | |
public void Invoke(Action<IModel> channelAction) | |
{ | |
try | |
{ | |
InvokeAsync(channelAction).Wait(); | |
} | |
catch (AggregateException e) | |
{ | |
throw e.InnerException; | |
} | |
} | |
public Task<T> InvokeAsync<T>(Func<IModel, T> channelAction) | |
{ | |
var tcs = new TaskCompletionSource<T>(); | |
try | |
{ | |
queue.Add(() => | |
{ | |
if (cancellation.IsCancellationRequested) | |
{ | |
tcs.TrySetCanceled(); | |
return; | |
} | |
var persistentChannel = _persistentChannels.GetObject(); | |
try | |
{ | |
persistentChannel.InvokeChannelAction(channel => tcs.TrySetResult(channelAction(channel))); | |
} | |
catch (Exception e) | |
{ | |
tcs.TrySetException(e); | |
} | |
finally | |
{ | |
_persistentChannels.PutObject(persistentChannel); | |
} | |
}, cancellation.Token); | |
} | |
catch (OperationCanceledException) | |
{ | |
tcs.TrySetCanceled(); | |
} | |
return tcs.Task; | |
} | |
public Task InvokeAsync(Action<IModel> channelAction) | |
{ | |
return InvokeAsync(x => | |
{ | |
channelAction(x); | |
return new NoContentStruct(); | |
}); | |
} | |
public void Dispose() | |
{ | |
cancellation.Cancel(); | |
IPersistentChannel persistentChannel; | |
while ((persistentChannel = _persistentChannels.GetObject()) != null) | |
{ | |
persistentChannel.Dispose(); | |
} | |
} | |
private void StartDispatcherThread(ConnectionConfiguration configuration) | |
{ | |
var thread = new Thread(() => | |
{ | |
while (!cancellation.IsCancellationRequested) | |
{ | |
try | |
{ | |
var channelAction = queue.Take(cancellation.Token); | |
channelAction(); | |
} | |
catch (OperationCanceledException) | |
{ | |
break; | |
} | |
} | |
}) | |
{ Name = "Client Command Dispatcher Thread", IsBackground = configuration.UseBackgroundThreads }; | |
thread.Start(); | |
} | |
private struct NoContentStruct | |
{ | |
} | |
} | |
// TODO: we can replace this with a more robust implementation such: | |
// https://www.nuget.org/packages/CodeProject.ObjectPool/ (https://github.com/pomma89/ObjectPool) | |
// https://www.nuget.org/packages/Microsoft.Extensions.ObjectPool/ (https://github.com/aspnet/Common/tree/master/src/Microsoft.Extensions.ObjectPool) | |
public interface IObjectPool<T> | |
{ | |
int Count { get; } | |
T GetObject(); | |
void PutObject(T item); | |
} | |
public class ObjectPool<T> : IObjectPool<T> | |
{ | |
private ConcurrentBag<T> _objects; | |
private Func<T> _objectGenerator; | |
public ObjectPool(Func<T> objectGenerator) | |
{ | |
if (objectGenerator == null) throw new ArgumentNullException("objectGenerator"); | |
_objects = new ConcurrentBag<T>(); | |
_objectGenerator = objectGenerator; | |
} | |
public int Count => _objects.Count; | |
public T GetObject() | |
{ | |
T item; | |
if (_objects.TryTake(out item)) return item; | |
return _objectGenerator(); | |
} | |
public void PutObject(T item) | |
{ | |
_objects.Add(item); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment