Skip to content

Instantly share code, notes, and snippets.

@petersondrew
Created January 7, 2015 21:01
Show Gist options
  • Save petersondrew/e958a00abf95b31b9366 to your computer and use it in GitHub Desktop.
Save petersondrew/e958a00abf95b31b9366 to your computer and use it in GitHub Desktop.
Masstransit channel leak on respond to temporary queue
using System;
using Contracts;
using MassTransit;
namespace Client
{
internal static class Program
{
static void Main()
{
// By using a temporary queue, and re-launching the client many times while leaving the server running
// we will see that we leak one channel each time the client closes until the server is stopped.
// Tested against RabbitMQ 3.3.0/Erlang R16B03-1/Win32, RabbitMQ 3.4.2/Erlang R16B03-1/Win32, and RabbitMQ 3.4.2/Erlang 17.4/Win32
var bus = ServiceBusFactory.New(sbc =>
{
sbc.UseRabbitMq();
sbc.ReceiveFrom("rabbitmq://127.0.0.1/*?temporary=true");
sbc.SetCreateMissingQueues(true);
sbc.DisablePerformanceCounters();
});
Console.WriteLine("Press any key to send a message, ESC to cancel");
while (true)
{
var input = Console.ReadKey(true);
if (input.Key == ConsoleKey.Escape) break;
bus.PublishRequest(new HelloMessage {Message = "Hi!", CorrelationId = NewId.NextGuid()},
configurator => configurator.Handle<ConfirmationMessage>(message =>
{
Console.WriteLine("Got read receipt");
}));
}
bus.Dispose();
}
}
}
using System;
using MassTransit;
namespace Contracts
{
public class ConfirmationMessage : CorrelatedBy<Guid>
{
public Guid CorrelationId { get; set; }
}
}
using System;
using Contracts;
using MassTransit;
namespace Server
{
public class Consumer : Consumes<HelloMessage>.Context
{
public void Consume(IConsumeContext<HelloMessage> context)
{
Console.WriteLine(context.Message.Message);
context.Respond(new ConfirmationMessage{CorrelationId = context.Message.CorrelationId});
}
}
}
using System;
using MassTransit;
namespace Contracts
{
public class HelloMessage : CorrelatedBy<Guid>
{
public Guid CorrelationId { get; set; }
public string Message { get; set; }
}
}
using System;
using MassTransit;
namespace Server
{
internal static class Program
{
static void Main()
{
var bus = ServiceBusFactory.New(sbc =>
{
sbc.UseRabbitMq();
sbc.ReceiveFrom("rabbitmq://127.0.0.1/server");
sbc.SetCreateMissingQueues(true);
sbc.DisablePerformanceCounters();
sbc.Subscribe(subs => subs.Consumer<Consumer>());
});
Console.WriteLine("Press any key to quit");
Console.ReadKey(true);
bus.Dispose();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment