Skip to content

Instantly share code, notes, and snippets.

@MirzaLeka
Last active July 17, 2025 15:01
Show Gist options
  • Save MirzaLeka/45e10ea8c738ec168898cd21090880d1 to your computer and use it in GitHub Desktop.
Save MirzaLeka/45e10ea8c738ec168898cd21090880d1 to your computer and use it in GitHub Desktop.
ASP .NET Worker Service for Apache ActiveMQ
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace FromDLQToQueue
{
public class Program
{
private static readonly string _brokerUri = "xxxxxxxx";
private static readonly string _username = "yyyyyyyy";
private static readonly string _password = "zzzzzzzzz";
public static void Main(string[] args)
{
var factory = new ConnectionFactory(_brokerUri);
using (IConnection connection = factory.CreateConnection(_username, _password))
{
connection.Start();
using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
{
IDestination currentDestination = session.GetQueue("ActiveMQ.DLQ");
using (IMessageConsumer consumer = session.CreateConsumer(currentDestination))
{
while (true)
{
IMessage message = consumer.Receive(TimeSpan.FromSeconds(1));
if (message == null) break;
if (message is ITextMessage textMessage)
{
// log
Console.WriteLine($"Message: <{textMessage.Text}> was published at: <{textMessage.NMSTimestamp}>");
if (textMessage.Text.Contains("pizza"))
{
IDestination originalDestination = session.GetQueue("my.queue");
// Set delivery mode
using var producer = session.CreateProducer(originalDestination);
producer.DeliveryMode = MsgDeliveryMode.Persistent;
// Send to original queue
producer.Send(textMessage);
// Remove Successful message from DLQ
session.Acknowledge();
}
else
{
// Call dispatcher or something
Console.WriteLine("Invalid pizza order!");
}
}
}
}
}
Console.ReadLine();
}
}
}
}
namespace AMQSubscriberWorker.Services
{
public interface IOrderService
{
PizzaOrder? DeserializeOrder(string pizzaOrder);
}
}
using System.Text.Json;
namespace AMQSubscriberWorker.Services
{
public class OrderService : IOrderService
{
public PizzaOrder? DeserializeOrder(string pizzaOrder)
{
try
{
var order = JsonSerializer.Deserialize<PizzaOrder>(pizzaOrder);
if (order is null)
{
return null;
}
return order;
}
catch(Exception)
{
return null;
}
}
}
}
namespace AMQSubscriberWorker
{
public class PizzaOrder
{
public Guid ID { get; set; }
public string Name { get; set; }
public DateTime DateOrdered { get; set; }
}
}
using AMQSubscriberWorker.Services;
namespace AMQSubscriberWorker
{
public class Program
{
public static void Main(string[] args)
{
var builder = Host.CreateApplicationBuilder(args);
// Registering the OrderService
builder.Services.AddScoped<IOrderService, OrderService>();
// Registering the background service
builder.Services.AddHostedService<Worker>();
var host = builder.Build();
host.Run();
}
}
}
using AMQSubscriberWorker.Services;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.Policies;
namespace AMQSubscriberWorker
{
public class Worker(ILogger<Worker> logger, IServiceProvider serviceProvider) : BackgroundService
{
private readonly IServiceProvider _serviceProvider = serviceProvider;
private readonly ILogger<Worker> _logger = logger;
private readonly string _brokerUri = "xxxxxxx";
private readonly string _username = "yyyyyyyyy";
private readonly string _password = "zzzzzzzzzzz";
private readonly string _queueName = "qqqqqqqqq";
private IConnection _connection;
private ISession _session;
private IMessageConsumer _consumer;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
while (!stoppingToken.IsCancellationRequested)
{
var factory = new ConnectionFactory(_brokerUri);
// five retries. retry every five seconds.
var redeliveryPolicy = new RedeliveryPolicy
{
MaximumRedeliveries = 5,
InitialRedeliveryDelay = 5000,
UseExponentialBackOff = false
};
// set policy
factory.RedeliveryPolicy = redeliveryPolicy;
_connection = factory.CreateConnection(_username, _password);
_connection.Start();
// set Acknowledgement Mode
_session = _connection.CreateSession(AcknowledgementMode.Transactional);
var destination = _session.GetQueue(_queueName);
_consumer = _session.CreateConsumer(destination);
_consumer.Listener += OnMessage;
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Operation was gracefully canceled.");
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Critical Exception!");
CloseSession();
}
}
private void CloseSession()
{
_consumer?.Close();
_session?.Close();
_connection?.Close();
}
private void OnMessage(IMessage message)
{
if (message is ITextMessage textMessage)
{
var text = textMessage.Text;
using var scope = _serviceProvider.CreateScope();
var orderService = scope.ServiceProvider.GetRequiredService<IOrderService>();
var order = orderService.DeserializeOrder(text);
if (order is not null)
{
_logger.LogInformation("Order received: {Name}, at: {Date}", order.Name, order.DateOrdered);
// pass successful result
_session.Commit(); // there is also commitAsync();
} else
{
// retry failed result
_session.Rollback(); // there is also rollbackAsync();
}
}
}
}
}
using AMQSubscriberWorker.Services;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using Microsoft.Extensions.Logging;
using System.Data.Common;
namespace AMQSubscriberWorker
{
public class Worker(ILogger<Worker> logger, IServiceProvider serviceProvider) : BackgroundService
{
private readonly IServiceProvider _serviceProvider = serviceProvider;
private readonly ILogger<Worker> _logger = logger;
private readonly string _brokerUri = "ssl://b-<YOUR-OPENWIRE-AMQ-URI>.amazonaws.com:61617";
private readonly string _username = "<YOUR-AMQ-USERNAME>";
private readonly string _password = "<YOUR-AMQ-USER-PASSWORD>";
private readonly string _queueName = "<YOUR-QUEUE>";
private IConnection _connection;
private Apache.NMS.ISession _session;
private IMessageConsumer _consumer;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
while (!stoppingToken.IsCancellationRequested)
{
var factory = new ConnectionFactory(_brokerUri);
_connection = factory.CreateConnection(_username, _password);
_connection.Start();
_session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
var destination = _session.GetQueue(_queueName);
_consumer = _session.CreateConsumer(destination);
_consumer.Listener += OnMessage;
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Operation was gracefully canceled.");
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Critical Exception!");
CloseSession();
}
}
private void CloseSession()
{
_consumer?.Close();
_session?.Close();
_connection?.Close();
}
private void OnMessage(IMessage message)
{
if (message is ITextMessage textMessage)
{
var text = textMessage.Text;
using var scope = _serviceProvider.CreateScope();
var orderService = scope.ServiceProvider.GetRequiredService<IOrderService>();
var order = orderService.DeserializeOrder(text);
if (order is not null)
{
_logger.LogInformation("Order received: {Name}, at: {Date}", order.Name, order.DateOrdered);
}
}
}
}
}
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using Microsoft.Extensions.Logging;
using System.Data.Common;
namespace AMQSubscriberWorker
{
public class Worker(ILogger<Worker> logger) : BackgroundService
{
private readonly ILogger<Worker> _logger = logger;
private readonly string _brokerUri = "ssl://b-<YOUR-OPENWIRE-AMQ-URI>.amazonaws.com:61617";
private readonly string _username = "<YOUR-AMQ-USERNAME>";
private readonly string _password = "<YOUR-AMQ-USER-PASSWORD>";
private readonly string _queueName = "<YOUR-QUEUE>";
private IConnection _connection;
private Apache.NMS.ISession _session;
private IMessageConsumer _consumer;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
while (!stoppingToken.IsCancellationRequested)
{
var factory = new ConnectionFactory(_brokerUri);
_connection = factory.CreateConnection(_username, _password);
_connection.Start();
_session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
var destination = _session.GetQueue(_queueName);
_consumer = _session.CreateConsumer(destination);
_consumer.Listener += OnMessage;
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Operation was gracefully canceled.");
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Critical Exception!");
CloseSession();
}
}
private void CloseSession()
{
_consumer?.Close();
_session?.Close();
_connection?.Close();
}
private void OnMessage(IMessage message)
{
if (message is ITextMessage textMessage)
{
var text = textMessage.Text;
_logger.LogInformation("Message received: {Text}", text);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment