Last active
August 29, 2015 14:02
-
-
Save meisinger/7d6f594faf01f7bfc28a to your computer and use it in GitHub Desktop.
consumer for cooperative queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using RabbitMQ.Client | |
public class CooperativeConsumer : IBasicConsumer | |
{ | |
private readonly IModel channel; | |
private readonly CooperativeQueue<IQueueMessage> queue; | |
public IModel Model | |
{ | |
get { return channel; } | |
} | |
public CooperativeQueue<IQueueMessage> Queue | |
{ | |
get { return queue; } | |
} | |
public bool IsRunning { get; private set; } | |
public string ConsumerTag { get; private set; } | |
public ShutdownEventArgs ShutdownReason { get; private set; } | |
public CooperativeConsumer(IModel channel) | |
: this (channel, new CooperativeQueue<IQueueMessage>()) | |
{ | |
} | |
public CooperativeConsumer(IModel channel, CooperativeQueue<IQueueMessage> queue) | |
{ | |
this.channel = channel; | |
this.queue = queue; | |
IsRunning = false; | |
ConsumerTag = null; | |
ShutdownReason = null; | |
} | |
public void HandleBasicCancel(string cosnumerTag) | |
{ | |
OnCancel(); | |
} | |
public void HandleBasicCancelOk(string consumerTag) | |
{ | |
OnCancel(); | |
} | |
public void HandleBasicConsumeOk(string consumerTag) | |
{ | |
ConsumerTag = consumerTag; | |
IsRunning = true; | |
} | |
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) | |
{ | |
var messageProperties = QueueMessage.ConvertProperties(properties); | |
var correlationId = "not-set"; | |
var action = "not-set"; | |
var type = "not-set"; | |
if (messageProperties.IsCorrelationIdSet) | |
correlationId = messageProperties.CorrelationId; | |
if (messageProperties.IsMessageActionSet) | |
action = messageProperties.MessageAction; | |
if (messageProperties.IsMessageTypeSet) | |
type = messageProperties.MessageType; | |
var message = new QueueMessage | |
{ | |
MessageId = deliveryTag, | |
Action = action, | |
Type = type, | |
CorrelationId = correlationId, | |
ConsumerId = consumerTag, | |
Exchange = exchange, | |
ExchangeRoute = routingKey, | |
Redelivered = redelivered, | |
Body = body, | |
Properties = messageProperties | |
}; | |
queue.Enqueue(message); | |
} | |
public void HandleModelShutdown(IModel model, ShutdownEventArgs reason) | |
{ | |
ShutdownReason = reason; | |
OnCancel(); | |
} | |
public void OnCancel() | |
{ | |
queue.Close(); | |
IsRunning = false; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface IQueueMessage | |
{ | |
ulong MessageId { get; } | |
string Action { get; } | |
string Type { get; } | |
string CorrelationId { get; } | |
string ConsumerId { get; } | |
string Exchange { get; } | |
string ExchangeRoute { get; } | |
byte[] Body { get; } | |
bool Redelivered { get; } | |
IQueueMessageProperties Properties { get; } | |
} | |
public interface IQueueMessageProperties | |
{ | |
bool IsAppIdSet { get; } | |
bool IsContentEncodingSet { get; } | |
bool IsContentTypeSet { get; } | |
bool IsCorrelationIdSet { get; } | |
bool IsExpirationSet { get; } | |
bool IsExternalMessageSet { get; } | |
bool IsMessageActionSet { get; } | |
bool IsMessageTypeSet { get; } | |
bool IsPrioritySet { get; } | |
bool IsReplyToSet { get; } | |
bool IsTimestampSet { get; } | |
bool IsTypeSet { get; } | |
bool IsUserIdSet { get; } | |
bool Persistent { get; } | |
bool HasHeaders { get; } | |
string AppId { get; } | |
string ContentEncoding { get; } | |
string ContentType { get; } | |
string CorrelationId { get; } | |
string Expiration { get; } | |
string ExternalMessageId { get; } | |
string MessageAction { get; } | |
string MessageType { get; } | |
string ReplyTo { get; } | |
string Type { get; } | |
string UserId { get; } | |
long Timestamp { get; } | |
int Priority { get; } | |
IEnumerable<KeyValuePair<string, object>> Headers { get; } | |
void AddHeader(string key, object value); | |
} | |
public class QueueMessage : IQueueMessage | |
{ | |
public ulong MessageId { get; set; } | |
public string Action { get; set; } | |
public string Type { get; set; } | |
public string CorrelationId { get; set; } | |
public string ConsumerId { get; set; } | |
public string Exchange { get; set; } | |
public string ExchangeRoute { get; set; } | |
public byte[] Body { get; set; } | |
public bool Redelivered { get; set; } | |
public IQueueMessageProperties Properties { get; set; } | |
public static IBasicProperties ConvertProperties(IModel channel, IQueueMessageProperties properties) | |
{ | |
var item = channel.CreateBasicProperties(); | |
if (properties == null) | |
return item; | |
if (properties.IsAppIdSet) | |
item.AppId = properties.AppId; | |
if (properties.IsContentEncodingSet) | |
item.ContentEncoding = properties.ContentEncoding; | |
if (properties.IsContentTypeSet) | |
item.ContentType = properties.ContentType; | |
if (properties.IsCorrelationIdSet) | |
item.CorrelationId = properties.CorrelationId; | |
if (properties.IsExpirationSet) | |
item.Expiration = properties.Expiration; | |
if (properties.IsExternalMessageSet) | |
item.MessageId = properties.ExternalMessageId; | |
if (properties.IsReplyToSet) | |
item.ReplyTo = properties.ReplyTo; | |
if (properties.IsTypeSet) | |
item.Type = properties.Type; | |
if (properties.IsUserIdSet) | |
item.UserId = properties.UserId; | |
if (properties.IsPrioritySet) | |
item.Priority = (byte)properties.Priority; | |
if (properties.IsTimestampSet) | |
item.Timestamp = new AmqpTimestamp(properties.Timestamp); | |
item.Headers = new Dictionary<string, object>(); | |
item.Headers["x-action"] = properties.MessageAction; | |
item.Headers["x-type"] = properties.MessageType; | |
if (properties.HasHeaders) | |
foreach (var header in properties.Headers) | |
item.Headers[header.Key] = header.Value; | |
item.SetPersistent(properties.Persistent); | |
return item; | |
} | |
public static IQueueMessageProperties ConvertProperties(IBasicProperties properties) | |
{ | |
var item = new QueueMessageProperties | |
{ | |
AppId = properties.AppId, | |
ContentEncoding = properties.ContentEncoding, | |
ContentType = properties.ContentType, | |
CorrelationId = properties.CorrelationId, | |
Expiration = properties.Expiration, | |
ExternalMessageId = properties.MessageId, | |
ReplyTo = properties.ReplyTo, | |
Type = properties.Type, | |
UserId = properties.UserId | |
}; | |
if (properties.IsDeliveryModePresent()) | |
item.Persistent = (properties.DeliveryMode.Equals(2)); | |
if (properties.IsPriorityPresent()) | |
item.Priority = properties.Priority; | |
if (properties.IsTimestampPresent()) | |
item.Timestamp = properties.Timestamp.UnixTime; | |
if (!properties.IsHeadersPresent()) | |
return item; | |
var collection = properties.Headers as Hashtable; | |
if (collection == null) | |
return item; | |
if (collection["x-action"] != null) | |
{ | |
var bytes = collection["x-action"] as byte[]; | |
if (bytes != null && bytes.Length != 0) | |
item.MessageAction = Encoding.UTF8.GetString(bytes); | |
} | |
if (collection["x-type"] != null) | |
{ | |
var bytes = collection["x-type"] as byte[]; | |
if (bytes != null && bytes.Length != 0) | |
item.MessageType = Encoding.UTF8.GetString(bytes); | |
} | |
foreach (string key in collection.Keys) | |
{ | |
if (key.Equals("x-action", StringComparison.Ordinal)) | |
continue; | |
if (key.Equals("x-type", StringComparison.Ordinal)) | |
continue; | |
item.AddHeader(key, collection[key]); | |
} | |
return item; | |
} | |
} | |
public class QueueMessageProperties : IQueueMessageProperties | |
{ | |
private readonly Dictionary<string, object> headers; | |
private string appId; | |
private string contentEncoding; | |
private string contentType; | |
private string correlationId; | |
private string expiration; | |
private string externalMessageId; | |
private string messageAction; | |
private string messageType; | |
private string replyTo; | |
private string type; | |
private string userId; | |
private long? timestamp; | |
private int? priority; | |
public bool HasHeaders | |
{ | |
get { return (headers.Count != 0); } | |
} | |
public IEnumerable<KeyValuePair<string, object>> Headers | |
{ | |
get { return headers; } | |
} | |
public bool IsAppIdSet { get; private set; } | |
public bool IsContentEncodingSet { get; private set; } | |
public bool IsContentTypeSet { get; private set; } | |
public bool IsCorrelationIdSet { get; private set; } | |
public bool IsExpirationSet { get; private set; } | |
public bool IsExternalMessageSet { get; private set; } | |
public bool IsMessageActionSet { get; private set; } | |
public bool IsMessageTypeSet { get; private set; } | |
public bool IsReplyToSet { get; private set; } | |
public bool IsTypeSet { get; private set; } | |
public bool IsUserIdSet { get; private set; } | |
public bool Persistent { get; set; } | |
public bool IsPrioritySet | |
{ | |
get { return (priority.HasValue); } | |
} | |
public bool IsTimestampSet | |
{ | |
get { return (timestamp.HasValue); } | |
} | |
public string AppId | |
{ | |
get { return appId; } | |
set { | |
appId = value; | |
IsAppIdSet = !(string.IsNullOrEmpty(value)); | |
} | |
} | |
public string ContentEncoding | |
{ | |
get { return contentEncoding; } | |
set { | |
contentEncoding = value; | |
IsContentEncodingSet = !(string.IsNullOrEmpty(value)); | |
} | |
} | |
public string ContentType | |
{ | |
get { return contentType; } | |
set { | |
contentType = value; | |
IsContentTypeSet = !(string.IsNullOrEmpty(value)); | |
} | |
} | |
public string CorrelationId | |
{ | |
get { return correlationId; } | |
set { | |
correlationId = value; | |
IsCorrelationIdSet = !(string.IsNullOrEmpty(value)); | |
} | |
} | |
public string Expiration | |
{ | |
get { return expiration; } | |
set { | |
expiration = value; | |
IsExpirationSet = !(string.IsNullOrEmpty(value)); | |
} | |
} | |
public string ExternalMessageId | |
{ | |
get { return externalMessageId; } | |
set { | |
externalMessageId = value; | |
IsExternalMessageSet = !(string.IsNullOrEmpty(value)); | |
} | |
} | |
public string MessageAction | |
{ | |
get { return messageAction; } | |
set { | |
messageAction = value; | |
IsMessageActionSet = !(string.IsNullOrEmpty(value)); | |
} | |
} | |
public string MessageType | |
{ | |
get { return messageType; } | |
set { | |
messageType = value; | |
IsMessageTypeSet = !(string.IsNullOrEmpty(value)); | |
} | |
} | |
public string ReplyTo | |
{ | |
get { return replyTo; } | |
set { | |
replyTo = value; | |
IsReplyToSet = !(string.IsNullOrEmpty(value)); | |
} | |
} | |
public string Type | |
{ | |
get { return type; } | |
set { | |
type = value; | |
IsTypeSet = !(string.IsNullOrEmpty(value)); | |
} | |
} | |
public string UserId | |
{ | |
get { return userId; } | |
set { | |
userId = value; | |
IsUserIdSet = !(string.IsNullOrEmpty(value)); | |
} | |
} | |
public int Priority | |
{ | |
get { return priority ?? 0; } | |
set { priority = value; } | |
} | |
public long Timestamp | |
{ | |
get { return timestamp ?? 0; } | |
set { timestamp = value; } | |
} | |
public QueueMessageProperties() | |
{ | |
headers = new Dictionary<string, object>(); | |
} | |
public void AddHeader(string key, object value) | |
{ | |
if (string.IsNullOrEmpty(key)) | |
return; | |
if (value == null) | |
return; | |
headers[key] = value; | |
} | |
public static QueueMessageProperties CreatePersistent(string type, string action) | |
{ | |
var correlationId = Guid.NewGuid().ToString("N"); | |
return CreatePersistent(correlationId, type, action); | |
} | |
public static QueueMessageProperties CreatePersistent(string correlationId, string type, string action) | |
{ | |
return new QueueMessageProperties | |
{ | |
CorrelationId = correlationId, | |
MessageType = type, | |
MessageAction = action, | |
Persistent = true, | |
}; | |
} | |
public static QueueMessageProperties CreateTransient(string type, string action) | |
{ | |
var correlationId = Guid.NewGuid().ToString("N"); | |
return CreateTransient(correlationId, type, action); | |
} | |
public static QueueMessageProperties CreateTransient(string correlationId, string type, string action) | |
{ | |
return new QueueMessageProperties | |
{ | |
CorrelationId = correlationId, | |
MessageType = type, | |
MessageAction = action, | |
Persistent = true, | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment