Skip to content

Instantly share code, notes, and snippets.

@kenanhancer
Last active July 19, 2018 12:23
Show Gist options
  • Save kenanhancer/78e2d084cd9dffe4f06bb3d7331bc86b to your computer and use it in GitHub Desktop.
Save kenanhancer/78e2d084cd9dffe4f06bb3d7331bc86b to your computer and use it in GitHub Desktop.
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Tasks;
namespace PubSubTest1
{
class Program
{
static ConcurrentBag<AsyncAutoResetEvent> cb = new ConcurrentBag<AsyncAutoResetEvent>();
static string queueName = "Test_Queue";
static void Main(string[] args)
{
Parallel.For(0, 10, i => Publisher());
Parallel.For(0, 10, async i => await Consumer());
Console.ReadLine();
foreach (var item in cb)
{
item.Set();
}
}
static Task Publisher()
{
ConnectionFactory connectionFactory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
string message = "Hello World";
using (var connection = connectionFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, false, false, false, null);
channel.BasicPublish("", queueName, null, Encoding.UTF8.GetBytes(message));
Console.WriteLine($"Sended message \"{message}\" in {queueName} Queue");
}
}
return Task.CompletedTask;
}
static async Task Consumer()
{
ConnectionFactory connectionFactory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
AsyncAutoResetEvent are = new AsyncAutoResetEvent();
cb.Add(are);
using (var connection = connectionFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Comming message \"{message}\" from {queueName}");
};
channel.BasicConsume(queueName, true, consumer);
await are.WaitAsync().ConfigureAwait(false);
}
}
}
}
}
using System.Collections.Generic;
using System.Threading.Tasks;
namespace PubSubTest1
{
public class AsyncAutoResetEvent
{
private static readonly Task s_completed = Task.FromResult(true);
private readonly Queue<TaskCompletionSource<bool>> m_waits = new Queue<TaskCompletionSource<bool>>();
private bool m_signaled;
public Task WaitAsync()
{
lock (m_waits)
{
if (m_signaled)
{
m_signaled = false;
return s_completed;
}
else
{
var tcs = new TaskCompletionSource<bool>();
m_waits.Enqueue(tcs);
return tcs.Task;
}
}
}
public void Set()
{
TaskCompletionSource<bool> toRelease = null;
lock (m_waits)
{
if (m_waits.Count > 0)
toRelease = m_waits.Dequeue();
else if (!m_signaled)
m_signaled = true;
}
toRelease?.SetResult(true);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment