Last active
July 19, 2018 12:23
-
-
Save kenanhancer/78e2d084cd9dffe4f06bb3d7331bc86b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using RabbitMQ.Client; | |
using RabbitMQ.Client.Events; | |
using System; | |
using System.Collections.Concurrent; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace PubSubTest1 | |
{ | |
class Program | |
{ | |
static ConcurrentBag<AsyncAutoResetEvent> cb = new ConcurrentBag<AsyncAutoResetEvent>(); | |
static string queueName = "Test_Queue"; | |
static void Main(string[] args) | |
{ | |
Parallel.For(0, 10, i => Publisher()); | |
Parallel.For(0, 10, async i => await Consumer()); | |
Console.ReadLine(); | |
foreach (var item in cb) | |
{ | |
item.Set(); | |
} | |
} | |
static Task Publisher() | |
{ | |
ConnectionFactory connectionFactory = new ConnectionFactory | |
{ | |
HostName = "localhost", | |
UserName = "guest", | |
Password = "guest" | |
}; | |
string message = "Hello World"; | |
using (var connection = connectionFactory.CreateConnection()) | |
{ | |
using (var channel = connection.CreateModel()) | |
{ | |
channel.QueueDeclare(queueName, false, false, false, null); | |
channel.BasicPublish("", queueName, null, Encoding.UTF8.GetBytes(message)); | |
Console.WriteLine($"Sended message \"{message}\" in {queueName} Queue"); | |
} | |
} | |
return Task.CompletedTask; | |
} | |
static async Task Consumer() | |
{ | |
ConnectionFactory connectionFactory = new ConnectionFactory | |
{ | |
HostName = "localhost", | |
UserName = "guest", | |
Password = "guest" | |
}; | |
AsyncAutoResetEvent are = new AsyncAutoResetEvent(); | |
cb.Add(are); | |
using (var connection = connectionFactory.CreateConnection()) | |
{ | |
using (var channel = connection.CreateModel()) | |
{ | |
var consumer = new EventingBasicConsumer(channel); | |
consumer.Received += (model, ea) => | |
{ | |
var body = ea.Body; | |
var message = Encoding.UTF8.GetString(body); | |
Console.WriteLine($"Comming message \"{message}\" from {queueName}"); | |
}; | |
channel.BasicConsume(queueName, true, consumer); | |
await are.WaitAsync().ConfigureAwait(false); | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
using System.Threading.Tasks; | |
namespace PubSubTest1 | |
{ | |
public class AsyncAutoResetEvent | |
{ | |
private static readonly Task s_completed = Task.FromResult(true); | |
private readonly Queue<TaskCompletionSource<bool>> m_waits = new Queue<TaskCompletionSource<bool>>(); | |
private bool m_signaled; | |
public Task WaitAsync() | |
{ | |
lock (m_waits) | |
{ | |
if (m_signaled) | |
{ | |
m_signaled = false; | |
return s_completed; | |
} | |
else | |
{ | |
var tcs = new TaskCompletionSource<bool>(); | |
m_waits.Enqueue(tcs); | |
return tcs.Task; | |
} | |
} | |
} | |
public void Set() | |
{ | |
TaskCompletionSource<bool> toRelease = null; | |
lock (m_waits) | |
{ | |
if (m_waits.Count > 0) | |
toRelease = m_waits.Dequeue(); | |
else if (!m_signaled) | |
m_signaled = true; | |
} | |
toRelease?.SetResult(true); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment