Skip to content

Instantly share code, notes, and snippets.

@medigor
Last active February 17, 2020 20:13
Show Gist options
  • Save medigor/f5d85f8deff92647ce8a4a1c7b2a3d27 to your computer and use it in GitHub Desktop.
Save medigor/f5d85f8deff92647ce8a4a1c7b2a3d27 to your computer and use it in GitHub Desktop.
module Tests
open System
open System.Text
open System.Threading.Tasks
open System.Diagnostics
open Xunit
open RabbitMQ.Client
open RabbitMQ.Client.Events
let hostname = "rabbit.host.ru"
let topicName = "topic1"
let routingKey = "info.all"
let consumer number i =
async {
let tcs = TaskCompletionSource()
let watch = Stopwatch.StartNew()
let factory = ConnectionFactory(HostName = hostname, DispatchConsumersAsync = true)
use connection = factory.CreateConnection()
use channel = connection.CreateModel()
channel.ExchangeDeclare(topicName, "topic", true, false)
let queueName = channel.QueueDeclare(queue = sprintf "q%i" i, durable = true, autoDelete = false, exclusive = false).QueueName
channel.QueueBind(queue = queueName, exchange = topicName, routingKey = "#")
let name = sprintf "consumer %i" i
let consumer = AsyncEventingBasicConsumer(channel)
let receiver _ (x: BasicDeliverEventArgs) =
channel.BasicAck(x.DeliveryTag, false)
if x.DeliveryTag = (uint64 1) then
Console.WriteLine(sprintf "%i %s receive 1" watch.ElapsedMilliseconds name)
if x.DeliveryTag = (uint64 number) then
tcs.SetResult()
Task.CompletedTask
consumer.add_Received(AsyncEventHandler(receiver))
let result = channel.BasicConsume(queue = queueName, autoAck = false, consumer = consumer)
do! Async.AwaitTask tcs.Task
Console.WriteLine(sprintf "%i %s done!" watch.ElapsedMilliseconds name)
}
let producer number =
async {
let tcs = TaskCompletionSource()
let factory = ConnectionFactory(HostName = hostname)
use connection = factory.CreateConnection()
use channel = connection.CreateModel()
let result = channel.ExchangeDeclare(topicName, "topic", true, false)
let properties = channel.CreateBasicProperties(Persistent = true)
let acker (x: BasicAckEventArgs) =
if x.DeliveryTag = (uint64 number) then
tcs.SetResult()
channel.BasicAcks.Add(acker)
channel.ConfirmSelect()
do! Async.Sleep 500
let watch = Stopwatch.StartNew()
for i = 1 to number do
let body = Encoding.UTF8.GetBytes(sprintf "Hello from F#! %O %i" DateTime.Now channel.NextPublishSeqNo)
channel.BasicPublish(topicName, routingKey = routingKey, basicProperties = properties, body = body)
Console.WriteLine(sprintf "%i producer wait!" watch.ElapsedMilliseconds)
do! Async.AwaitTask tcs.Task
Console.WriteLine(sprintf "%i producer done!" watch.ElapsedMilliseconds)
}
[<Fact>]
let ``Producer + Consumer`` () =
let number = 1_000_000
seq {
for i = 1 to 5 do
consumer number i
producer number
} |> Async.Parallel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment