Skip to content

Instantly share code, notes, and snippets.

@loosechainsaw
Created February 9, 2015 13:30
Show Gist options
  • Save loosechainsaw/d61b9bb41e3ecdf4eb12 to your computer and use it in GitHub Desktop.
Save loosechainsaw/d61b9bb41e3ecdf4eb12 to your computer and use it in GitHub Desktop.
Example of Scatter Gather in Akka.NET
using System;
using Akka.Actor;
using Akka;
using Akka.Remote;
using Akka.Routing;
using System.Linq;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Actors
{
public class Hotel
{
public Hotel (string country, string city, string name)
{
Country = country;
City = city;
Name = name;
}
public string Country{ get; private set; }
public string City{ get; private set; }
public string Name{ get; private set; }
public override string ToString ()
{
return string.Format ("[Hotel: Country={0}, City={1}, Name={2}]", Country, City, Name);
}
}
public class Query
{
public Query (string country, string city)
{
Country = country;
City = city;
}
public string Country{ get; private set; }
public string City{ get; private set; }
}
public class Results
{
public Results (string country, string city, IEnumerable<Hotel> hotels)
{
Country = country;
City = city;
Hotels = hotels;
}
public string Country{ get; private set; }
public string City{ get; private set; }
public IEnumerable<Hotel> Hotels{ get; private set; }
}
public class AggregatorActor : ReceiveActor
{
private List<Hotel> Results;
private int seen = 0;
public AggregatorActor (ActorRef original, int waitFor)
{
Results = new List<Hotel> ();
Receive<Results> (x => {
if (++seen == waitFor) {
original.Tell (x);
Self.Tell (PoisonPill.Instance);
}
});
}
}
public class CategoryActor : ReceiveActor
{
public CategoryActor (string country, IEnumerable<Hotel> hotels, bool top = false)
{
var listOfActors = new List<ActorRef> ();
if (top) {
var countries = hotels.Select (x => x.Country).Distinct ().ToList ();
countries.ForEach (c => {
var hotelsInCountry = hotels.Where (x => x.Country == c).ToList ();
listOfActors.Add (Context.ActorOf (Props.Create<CategoryActor> (c, hotelsInCountry, false), c));
});
} else {
var cities = hotels.Where (x => x.Country == country).Select (x => x.City).Distinct ().ToList ();
cities.ForEach (city => {
var hotelsInCity = hotels.Where (x => x.Country == country && x.City == city).ToList ();
listOfActors.Add (Context.ActorOf (Props.Create<TopicActor> (country, city, hotelsInCity), city));
});
}
Receive<Query> (q => {
var sender = Sender;
var aggregator = Context.ActorOf (Props.Create<AggregatorActor> (sender, listOfActors.Count), "Aggregator");
listOfActors.ForEach (x => {
x.Tell (q, aggregator);
});
});
}
}
public class TopicActor : ReceiveActor
{
public TopicActor (string country, string city, IEnumerable<Hotel> hotels)
{
this.country = country;
this.city = city;
this.hotels = hotels;
Receive<Query> (q => {
var sender = Sender;
sender.Tell (new Results (this.country, this.city, this.hotels.Where (x => x.Country == country && x.City == city).ToList ()));
});
}
private string country;
private string city;
private IEnumerable<Hotel> hotels;
}
class MainClass
{
public static List<Hotel> GetHotels ()
{
var hotels = new List<Hotel> ();
hotels.Add (new Hotel ("America", "NYC", "Hilton"));
hotels.Add (new Hotel ("America", "NYC", "Plaza"));
hotels.Add (new Hotel ("America", "NYC", "Pierre"));
hotels.Add (new Hotel ("America", "Vegas", "Caesars"));
hotels.Add (new Hotel ("America", "Vegas", "Mandalay"));
hotels.Add (new Hotel ("America", "Vegas", "Belagio"));
hotels.Add (new Hotel ("Australia", "Perth", "Burswood"));
hotels.Add (new Hotel ("Australia", "Perth", "Hyatt"));
hotels.Add (new Hotel ("Australia", "Perth", "Hilton"));
hotels.Add (new Hotel ("Australia", "Perth", "Duxton"));
hotels.Add (new Hotel ("Australia", "Perth", "Richardson"));
return hotels;
}
public static void Main (string[] args)
{
using (var system = ActorSystem.Create ("System")) {
var top = system.ActorOf (Props.Create<CategoryActor> (String.Empty, GetHotels (), true), "TopLevelCategory");
Thread.Sleep (2300);
var f = top.Ask<Results> (new Query ("America", "Vegas"));
f.ContinueWith (x => {
x.Result.Hotels.ToList ().ForEach (y => Console.WriteLine (y));
});
Console.ReadKey ();
system.Shutdown ();
}
}
}
}
@rogeralsing
Copy link

Looks good, some typo though:

Receive<Query>(q =>
{
   var sender = Sender;
   sender.Tell(new Results(country, city, hotels.Where(x => x.Country == q.Country && x.City == q.City).ToList()));
});

You should compare to q.Country and q.City, the original code compares to the inparam country.

@rogeralsing
Copy link

Also, using ReceiveActor you can move the class member state into the constructor instead if you like.
e.g.
Result and seen can be local variables inside the AggregateActor constructor.

@rogeralsing
Copy link

and in AggregatorActor, I take it that Results should get any incoming result appended to it?
And once it reaches waitFor, it returns all of the results, not just the last result (?)

@loosechainsaw
Copy link
Author

Thanks rodger :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment