Last active
August 29, 2015 13:59
-
-
Save tupunco/10440684 to your computer and use it in GitHub Desktop.
ConcurrentConsumerQueue 并行消费者队列
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace Tup.Utilities | |
{ | |
/// <summary> | |
/// 并行消费者队列 | |
/// </summary> | |
/// <remarks> | |
/// FROM: https://gist.github.com/tupunco/10440684 | |
/// </remarks> | |
public abstract class ConcurrentConsumerQueue<TData> : IDisposable | |
where TData : class | |
{ | |
private BlockingCollection<TData> _itemQueue = new BlockingCollection<TData>(); | |
/// <summary> | |
/// | |
/// </summary> | |
public ConcurrentConsumerQueue() | |
{ | |
Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning); | |
} | |
#region Dispose | |
/// <summary> | |
/// Flag: Has Dispose already been called? | |
/// </summary> | |
private bool disposed = false; | |
/// <summary> | |
/// | |
/// </summary> | |
public void Dispose() | |
{ | |
try | |
{ | |
_itemQueue.CompleteAdding(); | |
Dispose(true); | |
} | |
catch (Exception ex) | |
{ | |
LogHelper.LogError("{0}-Dispose(CompleteAdding)-ex:{1}", this.GetType(), ex); | |
} | |
GC.SuppressFinalize(this); | |
} | |
/// <summary> | |
/// Protected implementation of Dispose pattern. | |
/// </summary> | |
/// <param name="disposing"></param> | |
protected virtual void Dispose(bool disposing) | |
{ | |
if (disposed) | |
return; | |
if (disposing) | |
{ | |
// Free any other managed objects here. | |
// | |
} | |
// Free any unmanaged objects here. | |
// | |
disposed = true; | |
} | |
/// <summary> | |
/// | |
/// </summary> | |
~ConcurrentConsumerQueue() | |
{ | |
Dispose(false); | |
} | |
#endregion | |
/// <summary> | |
/// Gets the count. | |
/// </summary> | |
/// <value>The count.</value> | |
protected int Count | |
{ | |
get | |
{ | |
return _itemQueue.Count; | |
} | |
} | |
/// <summary> | |
/// | |
/// </summary> | |
/// <param name="dataItem"></param> | |
/// <returns></returns> | |
public void Enqueue(TData dataItem) | |
{ | |
try | |
{ | |
if (_itemQueue.IsAddingCompleted) | |
return; | |
_itemQueue.Add(dataItem); | |
} | |
catch (Exception ex) | |
{ | |
LogHelper.LogError("{0}-Enqueue(itemQueue.Add)-ex:{1}", this.GetType(), ex); | |
} | |
} | |
/// <summary> | |
/// | |
/// </summary> | |
private void ProcessQueue() | |
{ | |
foreach (var dataItem in _itemQueue.GetConsumingEnumerable()) | |
{ | |
try | |
{ | |
Process(dataItem); | |
Thread.Sleep(0); | |
} | |
catch (Exception ex) | |
{ | |
LogHelper.LogError("{0}-ConcurrentConsumerQueue-DataItem:{1}-ex:{2}", | |
this.GetType(), dataItem, ex); | |
ex = null; | |
Thread.Sleep(5); | |
} | |
} | |
} | |
/// <summary> | |
/// Processes the queue. | |
/// </summary> | |
/// <param name="dataItem"></param> | |
protected abstract void Process(TData dataItem); | |
} | |
/// <summary> | |
/// 并行缓冲消费者队列 | |
/// </summary> | |
public abstract class ConcurrentConsumerBufferQueue<TData> | |
: ConcurrentConsumerQueue<TData> where TData : class | |
{ | |
/// <summary> | |
/// BufferQueue 长度 | |
/// </summary> | |
private readonly static int s_MaxBufferQueueLen = 1000; | |
/// <summary> | |
/// BufferQueue 刷新时间间隔(单位:毫秒)-30 秒 | |
/// </summary> | |
private readonly static int s_FlushTimerPeriod = 30 * 1000; | |
/// <summary> | |
/// 切换 CacheQueue LockObj | |
/// </summary> | |
private object m_LockObj = new object(); | |
/// <summary> | |
/// 定时 FlushCache Timer | |
/// </summary> | |
private Timer m_FlushTimer = null; | |
/// <summary> | |
/// | |
/// </summary> | |
private Queue<TData> m_CacheQueue = null; | |
/// <summary> | |
/// | |
/// </summary> | |
private int m_bufferQueueLen = s_MaxBufferQueueLen; | |
/// <summary> | |
/// | |
/// </summary> | |
public ConcurrentConsumerBufferQueue() : this(s_MaxBufferQueueLen, s_FlushTimerPeriod) { } | |
/// <summary> | |
/// | |
/// </summary> | |
/// <param name="bufferQueueLen">BufferQueue 长度</param> | |
/// <remarks></remarks> | |
public ConcurrentConsumerBufferQueue(int bufferQueueLen) : this(bufferQueueLen, s_FlushTimerPeriod) { } | |
/// <summary> | |
/// | |
/// </summary> | |
/// <param name="bufferQueueLen">BufferQueue 长度</param> | |
/// <param name="flushTimerPeriod">BufferQueue 刷新时间间隔(单位:毫秒)</param> | |
/// <remarks></remarks> | |
public ConcurrentConsumerBufferQueue(int bufferQueueLen, int flushTimerPeriod) | |
{ | |
if (bufferQueueLen <= 0) | |
bufferQueueLen = s_MaxBufferQueueLen; | |
if (flushTimerPeriod <= 0) | |
flushTimerPeriod = s_FlushTimerPeriod; | |
m_bufferQueueLen = bufferQueueLen; | |
m_CacheQueue = new Queue<TData>(m_bufferQueueLen); | |
m_FlushTimer = new Timer(_ => FlushCache(), null, flushTimerPeriod, flushTimerPeriod); | |
} | |
#region Dispose | |
/// <summary> | |
/// Flag: Has Dispose already been called? | |
/// </summary> | |
private bool disposed = false; | |
/// <summary> | |
/// Protected implementation of Dispose pattern. | |
/// </summary> | |
/// <param name="disposing"></param> | |
protected override void Dispose(bool disposing) | |
{ | |
if (disposed) | |
return; | |
if (disposing) | |
{ | |
// Free any other managed objects here. | |
m_FlushTimer.Dispose(); | |
m_FlushTimer = null; | |
} | |
// Free any unmanaged objects here. | |
// | |
disposed = true; | |
} | |
#endregion | |
/// <summary> | |
/// | |
/// </summary> | |
/// <param name="dataItem"></param> | |
protected override void Process(TData dataItem) | |
{ | |
if (dataItem == null) | |
return; | |
m_CacheQueue.Enqueue(dataItem); | |
if (m_CacheQueue.Count >= m_bufferQueueLen) | |
FlushCache(); | |
} | |
/// <summary> | |
/// Flush Cache | |
/// </summary> | |
public void FlushCache() | |
{ | |
if (m_CacheQueue == null || m_CacheQueue.Count <= 0) | |
return; | |
Queue<TData> oldCacheQueue = null; | |
lock (m_LockObj) | |
{ | |
if (m_CacheQueue == null || m_CacheQueue.Count <= 0) | |
return; | |
oldCacheQueue = this.m_CacheQueue; | |
this.m_CacheQueue = new Queue<TData>(m_bufferQueueLen); | |
} | |
FlushProcessItem(oldCacheQueue); | |
oldCacheQueue = null; | |
} | |
/// <summary> | |
/// Processes the buffer queue item. | |
/// </summary> | |
/// <param name="dataBufferQueue"></param> | |
protected abstract void FlushProcessItem(IEnumerable<TData> dataBufferQueue); | |
} | |
} | |
namespace Tup.Utilities | |
{ | |
/// <summary> | |
/// LogHelper | |
/// </summary> | |
public static class LogHelper | |
{ | |
/// <summary> | |
/// | |
/// </summary> | |
/// <param name="format"></param> | |
/// <param name="args"></param> | |
public static void LogError(string format, params object[] args) | |
{ | |
Console.WriteLine(format, args); | |
} | |
} | |
/// <summary> | |
/// | |
/// </summary> | |
/// <typeparam name="TKey"></typeparam> | |
/// <typeparam name="TValue"></typeparam> | |
public class Pair<TKey, TValue> | |
{ | |
public Pair(TKey key, TValue value) | |
{ | |
this.Key = key; | |
this.Value = value; | |
} | |
public TKey Key { get; set; } | |
public TValue Value { get; set; } | |
public override string ToString() | |
{ | |
return string.Format("[Pair Key:{0}, Value:{1}]", Key, Value); | |
} | |
} | |
/// <summary> | |
/// | |
/// </summary> | |
public static class StringExtensions | |
{ | |
/// <summary> | |
/// 指示指定的 System.String 对象是 null 还是 System.String.Empty 字符串。 | |
/// </summary> | |
/// <param name="input">String to check</param> | |
/// <returns>bool</returns> | |
public static bool IsEmpty(this string input) | |
{ | |
return string.IsNullOrEmpty(input); | |
} | |
} | |
} | |
namespace Tup.Utilities.Test | |
{ | |
public class TestClass | |
{ | |
/// <summary> | |
/// 发送 MSMQ | |
/// </summary> | |
/// <param name="tag"></param> | |
/// <param name="msgMQ"></param> | |
public static void SendMQ(string tag, string msgMQ) | |
{ | |
//Single Queue | |
s_MQSendQueue.Enqueue(new Pair<string, string>(tag, msgMQ)); | |
//Buffer Queue | |
s_MQBufferSendQueue.Enqueue(new Pair<string, string>(tag, msgMQ)); | |
} | |
/// <summary> | |
/// MQ 发送队列 | |
/// </summary> | |
private static MQConsumerQueue s_MQSendQueue = new MQConsumerQueue(); | |
/// <summary> | |
/// MQ 发送队列 | |
/// </summary> | |
public class MQConsumerQueue | |
: ConcurrentConsumerQueue<Pair<string, string>> | |
{ | |
protected override void Process(Pair<string, string> dataItem) | |
{ | |
if (dataItem == null || dataItem.Key.IsEmpty() || dataItem.Value.IsEmpty()) | |
return; | |
//TODO SendMQ | |
System.Console.WriteLine("MQConsumerQueue-Process:{0}", dataItem); | |
} | |
} | |
/// <summary> | |
/// MQ Buffer 发送队列 | |
/// </summary> | |
private static MQBufferConsumerQueue s_MQBufferSendQueue = new MQBufferConsumerQueue(); | |
/// <summary> | |
/// MQ Buffer 发送队列 | |
/// </summary> | |
public class MQBufferConsumerQueue | |
: ConcurrentConsumerBufferQueue<Pair<string, string>> | |
{ | |
public MQBufferConsumerQueue() : base(10, 5000) { } | |
protected override void FlushProcessItem(IEnumerable<Pair<string, string>> dataBufferQueue) | |
{ | |
if (dataBufferQueue == null) | |
return; | |
//TODO Buffer SendMQ | |
System.Console.WriteLine("MQBufferConsumerQueue-FlushProcessItem:\r\n{0}", | |
string.Join(",\r\nE", dataBufferQueue)); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment