Created
January 11, 2012 10:49
-
-
Save javicrespo/1594137 to your computer and use it in GitHub Desktop.
Producer/Consumer implementation with .NET 4 BlockingCollection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Concurrent; | |
| using System.Collections.Generic; | |
| using System.Threading; | |
| namespace ProducerConsumer | |
| { | |
| /// <summary> | |
| /// Queue Service | |
| /// </summary> | |
| public class QueueService<TMessage> : IDisposable where TMessage : class | |
| { | |
| private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>(); | |
| private readonly List<QueueWorker> _workers = new List<QueueWorker>(); | |
| private readonly Action<TMessage> _messageHandler; | |
| /// <summary> | |
| /// Initializes a new instance of the <see cref="QueueService<TMessage>"/> class. | |
| /// </summary> | |
| /// <param name="messageHandler">The message handler.</param> | |
| public QueueService(Action<TMessage> messageHandler) | |
| { | |
| _messageHandler = messageHandler; | |
| } | |
| /// <summary> | |
| /// Queues the specified message. | |
| /// </summary> | |
| /// <param name="message">The message.</param> | |
| public void Queue(TMessage message) | |
| { | |
| _messages.Add(message); | |
| } | |
| /// <summary> | |
| /// How many messages are left in the queue to be sent | |
| /// </summary> | |
| public int QueueLength | |
| { | |
| get { return _messages.Count; } | |
| } | |
| /// <summary> | |
| /// Gets a value indicating whether this instance is running. | |
| /// </summary> | |
| /// <value> | |
| /// <c>true</c> if this instance is running; otherwise, <c>false</c>. | |
| /// </value> | |
| public bool IsRunning { get; private set; } | |
| /// <summary> | |
| /// How many worksers are currently running | |
| /// </summary> | |
| public int NumberOfWorkers | |
| { | |
| get { return _workers.Count; } | |
| } | |
| /// <summary> | |
| /// Starts the specified number of workers ready to send queued messages | |
| /// </summary> | |
| /// <param name="numberOfWorkers">Number of Workers</param> | |
| public void Start(int numberOfWorkers = 1) | |
| { | |
| IsRunning = true; | |
| SetNumberOfWorkers(numberOfWorkers); | |
| } | |
| /// <summary> | |
| /// Stops all workers and the service, without waiting for queued messages to be sent. | |
| /// </summary> | |
| public void Stop() | |
| { | |
| IsRunning = false; | |
| SetNumberOfWorkers(0); | |
| } | |
| /// <summary> | |
| /// Increase or decrease the number of workers to process queued messages. | |
| /// </summary> | |
| /// <param name="value">New Value for the number of workers to use</param> | |
| public void SetNumberOfWorkers(int value) | |
| { | |
| lock (_workers) | |
| { | |
| while (_workers.Count > value) | |
| { | |
| _workers[0].Stop(); | |
| _workers.RemoveAt(0); | |
| } | |
| while (_workers.Count < value) | |
| { | |
| _workers.Add(new QueueWorker(MessageHander)); | |
| } | |
| } | |
| } | |
| private void MessageHander(object state) | |
| { | |
| var cancelToken = (CancellationToken)state; | |
| while (!cancelToken.IsCancellationRequested) | |
| { | |
| var message = default(TMessage); | |
| if (!cancelToken.IsCancellationRequested && (message = _messages.Take()) != default(TMessage)) | |
| { | |
| _messageHandler(message); | |
| } | |
| } | |
| } | |
| ~QueueService() | |
| { | |
| Dispose(false); | |
| } | |
| protected virtual void Dispose(bool disposing) | |
| { | |
| _messages.Dispose(); | |
| _workers.ForEach(w => w.Dispose()); | |
| } | |
| public void Dispose() | |
| { | |
| Dispose(true); | |
| GC.SuppressFinalize(this); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| namespace ProducerConsumer | |
| { | |
| class QueueWorker : IDisposable | |
| { | |
| private readonly CancellationTokenSource _cancelTokenSource = new CancellationTokenSource(); | |
| private readonly Task _messageHandlerTask; | |
| public QueueWorker(Action<object> messageHandler) | |
| { | |
| Id = Guid.NewGuid().ToString(); | |
| _messageHandlerTask = Task.Factory.StartNew(messageHandler, _cancelTokenSource.Token, TaskCreationOptions.LongRunning); | |
| } | |
| public string Id | |
| { | |
| get; | |
| private set; | |
| } | |
| public void Stop() | |
| { | |
| _cancelTokenSource.Cancel(); | |
| _messageHandlerTask.Wait(); | |
| } | |
| public void Dispose() | |
| { | |
| _cancelTokenSource.Dispose(); | |
| _messageHandlerTask.Dispose(); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This code must be rewrited with GetConsumingEnumerable