Last active
March 4, 2018 11:44
-
-
Save rogeralsing/d05e4b7cc64a3cfff3b8 to your computer and use it in GitHub Desktop.
Durable Messagebus integration with Akka and Azure Servicebus
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using System.Threading.Tasks; | |
using Akka.Actor; | |
using Akka.Routing; | |
using Microsoft.ServiceBus.Messaging; | |
namespace ConsoleApplication13 | |
{ | |
public class MyBusinessActor : ReceiveActor | |
{ | |
public MyBusinessActor() | |
{ | |
//here is where you should receive your business messages | |
//apply domain logic, store to DB etc. | |
Receive<string>(s => | |
{ | |
Console.WriteLine("{0} Processed {1}", Self.Path, s); | |
//reply to the sender that everything went well | |
//in this example, we pass back the message we received in a built in `Success` message | |
//you can send back a Status.Failure incase of exceptions if you desire too | |
//or just let it fail by timeout as we do in this example | |
Sender.Tell(new Status.Success(s)); | |
}); | |
} | |
} | |
internal class Program | |
{ | |
private static void Main(string[] args) | |
{ | |
CreateMessages(); | |
using (var system = ActorSystem.Create("MySystem")) | |
{ | |
//spin up our workers | |
//this should be done via config, but here we use a hardcoded setup for simplicity | |
//Do note that the workers can be spread across multiple servers using Akka.Remote or Akka.Cluster | |
var businessActor = | |
system.ActorOf(Props.Create<MyBusinessActor>().WithRouter(new ConsistentHashingPool(10))); | |
//start the message processor | |
ProcessMessages(businessActor); | |
//wait for user to end the application | |
Console.ReadLine(); | |
} | |
} | |
private static async void ProcessMessages(IActorRef myBusinessActor) | |
{ | |
//set up a azure SB subscription client | |
//(or use a Queue client, or whatever client your specific MQ supports) | |
var subscriptionClient = SubscriptionClient.Create("service1", "service1"); | |
while (true) | |
{ | |
//fetch a batch of messages | |
var batch = await subscriptionClient.ReceiveBatchAsync(100, TimeSpan.FromSeconds(1)); | |
//transform the messages into a list of tasks | |
//the tasks will either be successful and ack the MQ message | |
//or they will timeout and do nothing | |
var tasks = ( | |
from res in batch | |
let importantMessage = res.GetBody<string>() | |
let ask = myBusinessActor | |
.Ask<Status.Success>(new ConsistentHashableEnvelope(importantMessage, | |
importantMessage.GetHashCode()),TimeSpan.FromSeconds(1)) | |
let done = ask.ContinueWith(t => | |
{ | |
if (t.IsCanceled) | |
{ | |
Console.WriteLine("Failed to ack {0}", importantMessage); | |
} | |
else | |
{ | |
res.Complete(); | |
Console.WriteLine("Completed {0}", importantMessage); | |
} | |
},TaskContinuationOptions.None) | |
select done).ToList(); | |
//wait for all messages to either succeed or timeout | |
await Task.WhenAll(tasks); | |
Console.WriteLine("All messages acked"); | |
//continue with the next batch | |
} | |
} | |
//dummy method only used to prefill the msgqueue with data for this example | |
private static void CreateMessages() | |
{ | |
var client = TopicClient.Create("service1"); | |
for (var i = 0; i < 100; i++) | |
{ | |
client.SendAsync(new BrokeredMessage("hello" + i) | |
{ | |
MessageId = Guid.NewGuid().ToString() | |
}); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment