Last active
August 29, 2015 14:06
-
-
Save dfch/6da3d17414c913595302 to your computer and use it in GitHub Desktop.
Simple RabbitMQ Wrapper for PowerShell
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
using System.Diagnostics; | |
using RabbitMQ.Client; | |
using RabbitMQ.Client.Events; | |
using RabbitMQ.Util; | |
// Add reference to Rabbit.MQ.Client.dll | |
namespace RabbitMQHelper | |
{ | |
public class Client : IDisposable | |
{ | |
#region ========== Constants ========== | |
private const string _VirtualHostDefault = "/"; | |
private const string _ServerDefault = "localhost"; | |
private const int _PortDefault = AmqpTcpEndpoint.UseDefaultPort; //5672; | |
private const string _UsernameDefault = "guest"; | |
private const string _PasswordDefault = "guest"; | |
private const string _ExchangeNameDefault = ""; | |
private const string _QueueNameDefault = "default"; | |
private const bool _AutoDeleteDefault = false; | |
private const bool _DurableDefault = true; | |
private const bool _ConnectionAutoCloseDefault = false; | |
private const int _WaitMilliSecondsDefault = -1; | |
#endregion | |
#region ========== Properties ========== | |
private bool disposed = false; | |
string _Server = _ServerDefault; | |
public string Server | |
{ | |
get { return _Server; } | |
set { _Server = value; } | |
} | |
int _Port = _PortDefault; | |
public int Port | |
{ | |
get { return _Port; } | |
set { _Port = value; } | |
} | |
string _QueueName = _QueueNameDefault; | |
public string QueueName | |
{ | |
get { return _QueueName; } | |
set { _QueueName = value; } | |
} | |
private bool _Durable = _DurableDefault; | |
public bool Durable | |
{ | |
get { return _Durable; } | |
set { _Durable = value; } | |
} | |
private bool _Exclusive; | |
public bool Exclusive | |
{ | |
get { return _Exclusive; } | |
set { _Exclusive = value; } | |
} | |
private bool _AutoDelete = _AutoDeleteDefault; | |
public bool AutoDelete | |
{ | |
get { return _AutoDelete; } | |
set { _AutoDelete = value; } | |
} | |
private bool _ConnectionAutoClose = _ConnectionAutoCloseDefault; | |
public bool ConnectionAutoClose | |
{ | |
get { return _ConnectionAutoClose; } | |
set { _ConnectionAutoClose = value; } | |
} | |
private string _ExchangeName = _ExchangeNameDefault; | |
public string ExchangeName | |
{ | |
get { return _ExchangeName; } | |
set { _ExchangeName = value; } | |
} | |
private QueueingBasicConsumer _Consumer; | |
public QueueingBasicConsumer Consumer | |
{ | |
get { return _Consumer; } | |
set { _Consumer = value; } | |
} | |
private IDictionary<string, object> _QueueArguments = new Dictionary<string, object>(); | |
public IDictionary<string, object> QueueArguments | |
{ | |
get { return _QueueArguments; } | |
set { _QueueArguments = value; } | |
} | |
private QueueDeclareOk _QueueDeclareReturn; | |
public QueueDeclareOk QueueDeclareReturn | |
{ | |
get { return _QueueDeclareReturn; } | |
set { _QueueDeclareReturn = value; } | |
} | |
private string _Username = _UsernameDefault; | |
public string Username | |
{ | |
get { return _Username; } | |
set { _Username = value; } | |
} | |
private string _Password = _PasswordDefault; | |
public string Password | |
{ | |
//get { return _Password; } | |
set { _Password = value; } | |
} | |
private int _WaitMilliSeconds = _WaitMilliSecondsDefault; | |
public int WaitMilliSeconds | |
{ | |
get { return _WaitMilliSeconds; } | |
set { _WaitMilliSeconds = value; } | |
} | |
private string _VirtualHost = _VirtualHostDefault; | |
public string VirtualHost | |
{ | |
get { return _VirtualHost; } | |
set { _VirtualHost = value; } | |
} | |
private IProtocol _Protocol = Protocols.FromEnvironment(); | |
public IProtocol Protocol | |
{ | |
get { return _Protocol; } | |
set { _Protocol = value; } | |
} | |
private ConnectionFactory _ConnectionFactory; | |
public ConnectionFactory ConnectionFactory | |
{ | |
get { return _ConnectionFactory; } | |
set { _ConnectionFactory = value; } | |
} | |
private IConnection _Connection; | |
public IConnection Connection | |
{ | |
get { return _Connection; } | |
set { _Connection = value; } | |
} | |
private IModel _Channel; | |
public IModel Channel | |
{ | |
get { return _Channel; } | |
set { _Channel = value; } | |
} | |
#endregion | |
#region ========== Methods ========== | |
public IConnection Connect() | |
{ | |
return Connect(_VirtualHost, _Server, _Port, _Username, _Password, null); | |
} | |
public IConnection Connect(string Server, string Username, string Password) | |
{ | |
return Connect(_VirtualHost, _Server, _Port, Username, Password, null); | |
} | |
public IConnection Connect(string VirtualHost, string Server, string Username, string Password) | |
{ | |
return Connect(VirtualHost, Server, _Port, Username, Password, null); | |
} | |
public IConnection Connect(string VirtualHost, string Server, int? Port, string Username, string Password, int? MaxRedirects) | |
{ | |
if (null == _Connection || !_Connection.IsOpen) | |
{ | |
if (null == _ConnectionFactory) | |
{ | |
var __VirtualHost = VirtualHost ?? _VirtualHost; | |
var __Server = Server ?? _Server; | |
var __Port = Port ?? _Port; | |
var __Username = Username ?? _Username; | |
var __Password = Password ?? _Password; | |
_ConnectionFactory = new ConnectionFactory() | |
{ | |
Protocol = _Protocol | |
, | |
VirtualHost = __VirtualHost | |
, | |
HostName = __Server | |
, | |
Port = __Port | |
, | |
UserName = __Username | |
, | |
Password = __Password | |
}; | |
} | |
if (null == MaxRedirects) | |
{ | |
_Connection = _ConnectionFactory.CreateConnection(); | |
} | |
else | |
{ | |
_Connection = _ConnectionFactory.CreateConnection((int) MaxRedirects); | |
} | |
} | |
return _Connection; | |
} | |
public void Disconnect() | |
{ | |
Disconnect(true, true, true, false); | |
return; | |
} | |
public void Disconnect(bool CloseConnection, bool CloseChannel, bool CloseConsumer, bool CloseQueue) | |
{ | |
if (null != _Consumer) | |
{ | |
if (CloseQueue) | |
{ | |
_Consumer.Queue.Close(); | |
} | |
if (CloseConsumer) | |
{ | |
_Consumer = null; | |
} | |
} | |
if (CloseChannel) | |
{ | |
if ((null != _Channel) && _Channel.IsOpen) | |
{ | |
_Channel.Close(); | |
} | |
_QueueDeclareReturn = null; | |
_Channel = null; | |
} | |
if (CloseConnection) | |
{ | |
if (null != _Connection) | |
{ | |
if (_Connection.IsOpen) | |
{ | |
_Connection.Close(); | |
} | |
_Connection = null; | |
if (null != _ConnectionFactory) | |
{ | |
_ConnectionFactory = null; | |
} | |
} | |
} | |
return; | |
} | |
public IModel CreateChannel() | |
{ | |
if (null == _Channel) | |
{ | |
Connect(); | |
_Channel = _Connection.CreateModel(); | |
_Connection.AutoClose = _ConnectionAutoClose; | |
} | |
return _Channel; | |
} | |
public QueueDeclareOk CreateQueue(string QueueName) | |
{ | |
if (null == _QueueDeclareReturn || !_QueueDeclareReturn.QueueName.Equals(QueueName, StringComparison.CurrentCultureIgnoreCase)) | |
{ | |
_QueueName = QueueName; | |
CreateChannel(); | |
_QueueDeclareReturn = _Channel.QueueDeclare(_QueueName, _Durable, _Exclusive, _AutoDelete, _QueueArguments); | |
_Channel.BasicQos(0, 1, false); | |
} | |
return _QueueDeclareReturn; | |
} | |
public string Receive() | |
{ | |
return Receive(_QueueName, _WaitMilliSeconds); | |
} | |
public string Receive(string QueueName) | |
{ | |
return Receive(QueueName, _WaitMilliSeconds); | |
} | |
public string Receive(int? WaitMilliSeconds) | |
{ | |
return Receive(_QueueName, WaitMilliSeconds); | |
} | |
public string Receive(string QueueName, int? WaitMilliSeconds) | |
{ | |
//var fReturn = false; | |
var __QueueName = QueueName ?? _QueueName; | |
var __WaitMilliSeconds = WaitMilliSeconds ?? _WaitMilliSeconds; | |
string message = null; | |
try { | |
CreateQueue(_QueueName); | |
if (null == _Consumer) | |
{ | |
_Consumer = new QueueingBasicConsumer(_Channel); | |
_Channel.BasicConsume(__QueueName, false, _Consumer); | |
} | |
BasicDeliverEventArgs ea; | |
bool fReturn = _Consumer.Queue.Dequeue((int)__WaitMilliSeconds, out ea); | |
if (fReturn) | |
{ | |
var body = ea.Body; | |
message = Encoding.UTF8.GetString(body); | |
_Channel.BasicAck(ea.DeliveryTag, false); | |
} | |
} | |
catch (Exception ex) | |
{ | |
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.HResult, ex.Message, ex.StackTrace)); | |
if (null != ex.InnerException) | |
{ | |
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.InnerException.HResult, ex.InnerException.Message, ex.InnerException.StackTrace)); | |
} | |
throw; | |
} | |
return message; | |
} | |
public bool Send(string message) | |
{ | |
return Send(_QueueName, message); | |
} | |
public bool Send(string QueueName, string message) | |
{ | |
var fReturn = false; | |
try | |
{ | |
CreateQueue(_QueueName); | |
var body = Encoding.UTF8.GetBytes(message); | |
var properties = _Channel.CreateBasicProperties(); | |
properties.SetPersistent(true); | |
_Channel.BasicPublish(_ExchangeName, _QueueName, properties, body); | |
fReturn = true; | |
} | |
catch (Exception ex) | |
{ | |
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.HResult, ex.Message, ex.StackTrace)); | |
if (null != ex.InnerException) | |
{ | |
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.InnerException.HResult, ex.InnerException.Message, ex.InnerException.StackTrace)); | |
} | |
throw; | |
} | |
return fReturn; | |
} | |
public Client() | |
{ | |
} | |
public Client(string Server, string Username, string Password) | |
{ | |
_Server = Server; | |
_Username = Username; | |
_Password = Password; | |
} | |
public Client(string VirtualHost, string Server, string Username, string Password) | |
{ | |
_VirtualHost = VirtualHost; | |
_Server = Server; | |
_Username = Username; | |
_Password = Password; | |
} | |
public Client(string VirtualHost, string Server, int Port, string Username, string Password) | |
{ | |
_VirtualHost = VirtualHost; | |
_Server = Server; | |
_Port = Port; | |
_Username = Username; | |
_Password = Password; | |
} | |
~Client() | |
{ | |
Dispose(false); | |
} | |
public void Dispose() | |
{ | |
Dispose(true); | |
GC.SuppressFinalize(this); | |
} | |
protected void Dispose(bool disposing) | |
{ | |
try | |
{ | |
if (disposed) | |
return; | |
if (disposing) | |
{ | |
// Free any other managed objects here. | |
// | |
} | |
// Free any unmanaged objects here. | |
// | |
if (null != _Channel && _Channel.IsOpen) | |
{ | |
} | |
_Channel = null; | |
if (null != _Connection && _Connection.IsOpen) | |
{ | |
} | |
_Connection = null; | |
if (null != _Consumer) | |
{ | |
} | |
_Consumer = null; | |
Disconnect(); | |
disposed = true; | |
} | |
catch (Exception ex) | |
{ | |
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.HResult, ex.Message, ex.StackTrace)); | |
if (null != ex.InnerException) | |
{ | |
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.InnerException.HResult, ex.InnerException.Message, ex.InnerException.StackTrace)); | |
} | |
throw; | |
} | |
} | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment