Last active
July 17, 2025 15:01
-
-
Save MirzaLeka/45e10ea8c738ec168898cd21090880d1 to your computer and use it in GitHub Desktop.
ASP .NET Worker Service for Apache ActiveMQ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Apache.NMS; | |
using Apache.NMS.ActiveMQ; | |
namespace FromDLQToQueue | |
{ | |
public class Program | |
{ | |
private static readonly string _brokerUri = "xxxxxxxx"; | |
private static readonly string _username = "yyyyyyyy"; | |
private static readonly string _password = "zzzzzzzzz"; | |
public static void Main(string[] args) | |
{ | |
var factory = new ConnectionFactory(_brokerUri); | |
using (IConnection connection = factory.CreateConnection(_username, _password)) | |
{ | |
connection.Start(); | |
using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge)) | |
{ | |
IDestination currentDestination = session.GetQueue("ActiveMQ.DLQ"); | |
using (IMessageConsumer consumer = session.CreateConsumer(currentDestination)) | |
{ | |
while (true) | |
{ | |
IMessage message = consumer.Receive(TimeSpan.FromSeconds(1)); | |
if (message == null) break; | |
if (message is ITextMessage textMessage) | |
{ | |
// log | |
Console.WriteLine($"Message: <{textMessage.Text}> was published at: <{textMessage.NMSTimestamp}>"); | |
if (textMessage.Text.Contains("pizza")) | |
{ | |
IDestination originalDestination = session.GetQueue("my.queue"); | |
// Set delivery mode | |
using var producer = session.CreateProducer(originalDestination); | |
producer.DeliveryMode = MsgDeliveryMode.Persistent; | |
// Send to original queue | |
producer.Send(textMessage); | |
// Remove Successful message from DLQ | |
session.Acknowledge(); | |
} | |
else | |
{ | |
// Call dispatcher or something | |
Console.WriteLine("Invalid pizza order!"); | |
} | |
} | |
} | |
} | |
} | |
Console.ReadLine(); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace AMQSubscriberWorker.Services | |
{ | |
public interface IOrderService | |
{ | |
PizzaOrder? DeserializeOrder(string pizzaOrder); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Text.Json; | |
namespace AMQSubscriberWorker.Services | |
{ | |
public class OrderService : IOrderService | |
{ | |
public PizzaOrder? DeserializeOrder(string pizzaOrder) | |
{ | |
try | |
{ | |
var order = JsonSerializer.Deserialize<PizzaOrder>(pizzaOrder); | |
if (order is null) | |
{ | |
return null; | |
} | |
return order; | |
} | |
catch(Exception) | |
{ | |
return null; | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace AMQSubscriberWorker | |
{ | |
public class PizzaOrder | |
{ | |
public Guid ID { get; set; } | |
public string Name { get; set; } | |
public DateTime DateOrdered { get; set; } | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using AMQSubscriberWorker.Services; | |
namespace AMQSubscriberWorker | |
{ | |
public class Program | |
{ | |
public static void Main(string[] args) | |
{ | |
var builder = Host.CreateApplicationBuilder(args); | |
// Registering the OrderService | |
builder.Services.AddScoped<IOrderService, OrderService>(); | |
// Registering the background service | |
builder.Services.AddHostedService<Worker>(); | |
var host = builder.Build(); | |
host.Run(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using AMQSubscriberWorker.Services; | |
using Apache.NMS; | |
using Apache.NMS.ActiveMQ; | |
using Apache.NMS.Policies; | |
namespace AMQSubscriberWorker | |
{ | |
public class Worker(ILogger<Worker> logger, IServiceProvider serviceProvider) : BackgroundService | |
{ | |
private readonly IServiceProvider _serviceProvider = serviceProvider; | |
private readonly ILogger<Worker> _logger = logger; | |
private readonly string _brokerUri = "xxxxxxx"; | |
private readonly string _username = "yyyyyyyyy"; | |
private readonly string _password = "zzzzzzzzzzz"; | |
private readonly string _queueName = "qqqqqqqqq"; | |
private IConnection _connection; | |
private ISession _session; | |
private IMessageConsumer _consumer; | |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
try | |
{ | |
while (!stoppingToken.IsCancellationRequested) | |
{ | |
var factory = new ConnectionFactory(_brokerUri); | |
// five retries. retry every five seconds. | |
var redeliveryPolicy = new RedeliveryPolicy | |
{ | |
MaximumRedeliveries = 5, | |
InitialRedeliveryDelay = 5000, | |
UseExponentialBackOff = false | |
}; | |
// set policy | |
factory.RedeliveryPolicy = redeliveryPolicy; | |
_connection = factory.CreateConnection(_username, _password); | |
_connection.Start(); | |
// set Acknowledgement Mode | |
_session = _connection.CreateSession(AcknowledgementMode.Transactional); | |
var destination = _session.GetQueue(_queueName); | |
_consumer = _session.CreateConsumer(destination); | |
_consumer.Listener += OnMessage; | |
await Task.Delay(Timeout.Infinite, stoppingToken); | |
} | |
} | |
catch (OperationCanceledException) | |
{ | |
_logger.LogInformation("Operation was gracefully canceled."); | |
} | |
catch (Exception ex) | |
{ | |
_logger.LogCritical(ex, "Critical Exception!"); | |
CloseSession(); | |
} | |
} | |
private void CloseSession() | |
{ | |
_consumer?.Close(); | |
_session?.Close(); | |
_connection?.Close(); | |
} | |
private void OnMessage(IMessage message) | |
{ | |
if (message is ITextMessage textMessage) | |
{ | |
var text = textMessage.Text; | |
using var scope = _serviceProvider.CreateScope(); | |
var orderService = scope.ServiceProvider.GetRequiredService<IOrderService>(); | |
var order = orderService.DeserializeOrder(text); | |
if (order is not null) | |
{ | |
_logger.LogInformation("Order received: {Name}, at: {Date}", order.Name, order.DateOrdered); | |
// pass successful result | |
_session.Commit(); // there is also commitAsync(); | |
} else | |
{ | |
// retry failed result | |
_session.Rollback(); // there is also rollbackAsync(); | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using AMQSubscriberWorker.Services; | |
using Apache.NMS; | |
using Apache.NMS.ActiveMQ; | |
using Apache.NMS.ActiveMQ.Commands; | |
using Microsoft.Extensions.Logging; | |
using System.Data.Common; | |
namespace AMQSubscriberWorker | |
{ | |
public class Worker(ILogger<Worker> logger, IServiceProvider serviceProvider) : BackgroundService | |
{ | |
private readonly IServiceProvider _serviceProvider = serviceProvider; | |
private readonly ILogger<Worker> _logger = logger; | |
private readonly string _brokerUri = "ssl://b-<YOUR-OPENWIRE-AMQ-URI>.amazonaws.com:61617"; | |
private readonly string _username = "<YOUR-AMQ-USERNAME>"; | |
private readonly string _password = "<YOUR-AMQ-USER-PASSWORD>"; | |
private readonly string _queueName = "<YOUR-QUEUE>"; | |
private IConnection _connection; | |
private Apache.NMS.ISession _session; | |
private IMessageConsumer _consumer; | |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
try | |
{ | |
while (!stoppingToken.IsCancellationRequested) | |
{ | |
var factory = new ConnectionFactory(_brokerUri); | |
_connection = factory.CreateConnection(_username, _password); | |
_connection.Start(); | |
_session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge); | |
var destination = _session.GetQueue(_queueName); | |
_consumer = _session.CreateConsumer(destination); | |
_consumer.Listener += OnMessage; | |
await Task.Delay(Timeout.Infinite, stoppingToken); | |
} | |
} | |
catch (OperationCanceledException) | |
{ | |
_logger.LogInformation("Operation was gracefully canceled."); | |
} | |
catch (Exception ex) | |
{ | |
_logger.LogCritical(ex, "Critical Exception!"); | |
CloseSession(); | |
} | |
} | |
private void CloseSession() | |
{ | |
_consumer?.Close(); | |
_session?.Close(); | |
_connection?.Close(); | |
} | |
private void OnMessage(IMessage message) | |
{ | |
if (message is ITextMessage textMessage) | |
{ | |
var text = textMessage.Text; | |
using var scope = _serviceProvider.CreateScope(); | |
var orderService = scope.ServiceProvider.GetRequiredService<IOrderService>(); | |
var order = orderService.DeserializeOrder(text); | |
if (order is not null) | |
{ | |
_logger.LogInformation("Order received: {Name}, at: {Date}", order.Name, order.DateOrdered); | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Apache.NMS; | |
using Apache.NMS.ActiveMQ; | |
using Apache.NMS.ActiveMQ.Commands; | |
using Microsoft.Extensions.Logging; | |
using System.Data.Common; | |
namespace AMQSubscriberWorker | |
{ | |
public class Worker(ILogger<Worker> logger) : BackgroundService | |
{ | |
private readonly ILogger<Worker> _logger = logger; | |
private readonly string _brokerUri = "ssl://b-<YOUR-OPENWIRE-AMQ-URI>.amazonaws.com:61617"; | |
private readonly string _username = "<YOUR-AMQ-USERNAME>"; | |
private readonly string _password = "<YOUR-AMQ-USER-PASSWORD>"; | |
private readonly string _queueName = "<YOUR-QUEUE>"; | |
private IConnection _connection; | |
private Apache.NMS.ISession _session; | |
private IMessageConsumer _consumer; | |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
try | |
{ | |
while (!stoppingToken.IsCancellationRequested) | |
{ | |
var factory = new ConnectionFactory(_brokerUri); | |
_connection = factory.CreateConnection(_username, _password); | |
_connection.Start(); | |
_session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge); | |
var destination = _session.GetQueue(_queueName); | |
_consumer = _session.CreateConsumer(destination); | |
_consumer.Listener += OnMessage; | |
await Task.Delay(Timeout.Infinite, stoppingToken); | |
} | |
} | |
catch (OperationCanceledException) | |
{ | |
_logger.LogInformation("Operation was gracefully canceled."); | |
} | |
catch (Exception ex) | |
{ | |
_logger.LogCritical(ex, "Critical Exception!"); | |
CloseSession(); | |
} | |
} | |
private void CloseSession() | |
{ | |
_consumer?.Close(); | |
_session?.Close(); | |
_connection?.Close(); | |
} | |
private void OnMessage(IMessage message) | |
{ | |
if (message is ITextMessage textMessage) | |
{ | |
var text = textMessage.Text; | |
_logger.LogInformation("Message received: {Text}", text); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment