Created
February 13, 2017 22:27
-
-
Save danielmarbach/80b2c60cced407e1d3500db8c85de543 to your computer and use it in GitHub Desktop.
RabbitMQ tryouts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Program | |
{ | |
private static int numberOfMessages; | |
static void Main(string[] args) | |
{ | |
File.Delete("./sync.txt"); | |
File.Delete("./async.txt"); | |
numberOfMessages = 1000; | |
Console.WriteLine(" Sync Consume."); | |
SendMessages(); | |
SyncConsumer(UsageMode.Sleep); | |
SendMessages(); | |
SyncConsumer(UsageMode.FileStreamInside); | |
SendMessages(); | |
SyncConsumer(UsageMode.FileStreamOutside); | |
Console.WriteLine("Async consume."); | |
SendMessages(); | |
AsyncConsumer(UsageMode.Sleep); | |
SendMessages(); | |
AsyncConsumer(UsageMode.FileStreamInside); | |
SendMessages(); | |
AsyncConsumer(UsageMode.FileStreamOutside); | |
Console.WriteLine(" Press [enter] to exit."); | |
Console.ReadLine(); | |
} | |
enum UsageMode | |
{ | |
Sleep, | |
FileStreamOutside, | |
FileStreamInside, | |
} | |
private static void AsyncConsumer(UsageMode mode) | |
{ | |
var countDown = new CountdownEvent(numberOfMessages); | |
var factory = new ConnectionFactory() { HostName = "localhost", DispatchConsumersAsync = true }; | |
using (var connection = factory.CreateConnection()) | |
using (var channel = connection.CreateModel()) | |
{ | |
channel.QueueDeclare(queue: "hello", | |
durable: false, | |
exclusive: false, | |
autoDelete: false, | |
arguments: null); | |
FileStream file = null; | |
if(mode == UsageMode.FileStreamOutside) | |
file = new FileStream("./async.txt", FileMode.Append, FileAccess.Write, FileShare.Write, bufferSize: 4096, useAsync: true); | |
var consumer = new AsyncEventingBasicConsumer(channel); | |
consumer.Received = async (model, args) => | |
{ | |
if (countDown.IsSet) | |
{ | |
return; | |
} | |
if (mode == UsageMode.Sleep) | |
await Task.Delay(15).ConfigureAwait(false); | |
if (mode == UsageMode.FileStreamInside) | |
file = new FileStream("./async.txt", FileMode.Append, FileAccess.Write, FileShare.Write, bufferSize: 4096, useAsync: true); | |
if (mode == UsageMode.FileStreamInside || mode == UsageMode.FileStreamOutside) | |
await file.WriteAsync(args.Body, 0, args.Body.Length).ConfigureAwait(false); | |
if (mode == UsageMode.FileStreamInside) | |
file.Dispose(); | |
if (!countDown.IsSet) | |
{ | |
countDown.Signal(); | |
} | |
}; | |
var stopWatch = Stopwatch.StartNew(); | |
channel.BasicConsume("hello", true, consumer); | |
countDown.Wait(); | |
file?.Close(); | |
stopWatch.Stop(); | |
Console.WriteLine("Done {0}: {1}", mode, stopWatch.Elapsed); | |
} | |
} | |
private static void SyncConsumer(UsageMode mode) | |
{ | |
var countDown = new CountdownEvent(numberOfMessages); | |
var factory = new ConnectionFactory() { HostName = "localhost" }; | |
using (var connection = factory.CreateConnection()) | |
using (var channel = connection.CreateModel()) | |
{ | |
channel.QueueDeclare(queue: "hello", | |
durable: false, | |
exclusive: false, | |
autoDelete: false, | |
arguments: null); | |
FileStream file = null; | |
if (mode == UsageMode.FileStreamOutside) | |
file = new FileStream("./sync.txt", FileMode.Append, FileAccess.Write, FileShare.Write, bufferSize: 4096, useAsync: false); | |
var consumer = new EventingBasicConsumer(channel); | |
consumer.Received += (model, ea) => | |
{ | |
if (countDown.IsSet) | |
{ | |
return; | |
} | |
if (mode == UsageMode.Sleep) | |
Thread.Sleep(15); | |
if (mode == UsageMode.FileStreamInside) | |
file = new FileStream("./sync.txt", FileMode.Append, FileAccess.Write, FileShare.Write, bufferSize: 4096, useAsync: false); | |
if (mode == UsageMode.FileStreamInside || mode == UsageMode.FileStreamOutside) | |
file.Write(ea.Body, 0, ea.Body.Length); | |
if (mode == UsageMode.FileStreamInside) | |
file.Dispose(); | |
if (!countDown.IsSet) | |
{ | |
countDown.Signal(); | |
} | |
}; | |
var stopWatch = Stopwatch.StartNew(); | |
channel.BasicConsume("hello", true, consumer); | |
countDown.Wait(); | |
file?.Close(); | |
stopWatch.Stop(); | |
Console.WriteLine("Done {0}: {1}", mode, stopWatch.Elapsed); | |
} | |
} | |
private static void SendMessages() | |
{ | |
var factory = new ConnectionFactory() {HostName = "localhost"}; | |
using (var connection = factory.CreateConnection()) | |
using (var channel = connection.CreateModel()) | |
{ | |
channel.QueueDeclare(queue: "hello", | |
durable: false, | |
exclusive: false, | |
autoDelete: false, | |
arguments: null); | |
for (int i = 0; i < numberOfMessages; i++) | |
{ | |
var body = Encoding.UTF8.GetBytes(i.ToString()); | |
channel.BasicPublish(exchange: "", | |
routingKey: "hello", | |
basicProperties: null, | |
body: body); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment