Last active
December 15, 2015 16:51
-
-
Save Horusiath/e0c0bfc9eabbe096cb21 to your computer and use it in GitHub Desktop.
This is example of returning responses from actor refs in form of the enumerable.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class EnumerablePatterns | |
{ | |
public static IEnumerable<T> Query<T>(this ICanTell self, object message = null) | |
{ | |
var provider = ResolveProvider(self); | |
if (provider == null) | |
throw new NotSupportedException("Unable to resolve the target Provider"); | |
var enumerable = new BlockingCollection<T>(); | |
var path = provider.TempPath(); | |
Action unregister = () => provider.UnregisterTempActor(path); | |
var queryable = new EnumerableActorRef<T>(enumerable, unregister, path); | |
provider.RegisterTempActor(queryable, path); | |
self.Tell(message ?? Start.Instance, queryable); | |
return enumerable.GetConsumingEnumerable(); | |
} | |
private static IActorRefProvider ResolveProvider(ICanTell self) | |
{ | |
IInternalActorRef internalRef; | |
if ((internalRef = self as IInternalActorRef) != null) | |
return internalRef.Provider; | |
ActorSelection selection; | |
if ((selection = self as ActorSelection) != null) | |
return ResolveProvider(selection.Anchor); | |
return null; | |
} | |
} | |
internal class EnumerableActorRef<T> : MinimalActorRef | |
{ | |
private readonly BlockingCollection<T> _enumerable; | |
private readonly Action _unregister; | |
private readonly ActorPath _path; | |
internal EnumerableActorRef(BlockingCollection<T> enumerable, Action unregister, ActorPath path) | |
{ | |
_enumerable = enumerable; | |
_unregister = unregister; | |
_path = path; | |
} | |
public override ActorPath Path { get { return _path; } } | |
public override IActorRefProvider Provider { get { throw new NotSupportedException(); } } | |
protected override void TellInternal(object message, IActorRef sender) | |
{ | |
if (message is ISystemMessage) | |
SendSystemMessage(message.AsInstanceOf<ISystemMessage>(), sender); | |
else if (message is Completed) | |
{ | |
_unregister(); | |
_enumerable.CompleteAdding(); | |
_enumerable.Dispose(); | |
} | |
else if (message is T) | |
_enumerable.Add((T)message); | |
} | |
} | |
public struct Completed | |
{ | |
public static readonly Completed Instance = new Completed(); | |
} | |
public struct Start | |
{ | |
public static readonly Start Instance = new Start(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MyActor : ReceiveActor | |
{ | |
public MyActor() | |
{ | |
ReceiveAny(_ => | |
{ | |
StartProducing(Sender); | |
}); | |
// in order to close enumerable, this actor should send Complete.Instance to the same sender, which received initial response | |
} | |
private void StartProducing(IActorRef sender) | |
{ | |
var sec = TimeSpan.FromSeconds(1); | |
var i = 1; | |
Context.System.Scheduler.Advanced.ScheduleRepeatedly(sec, sec, () => sender.Tell(i++)); | |
} | |
} | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
using (var system = ActorSystem.Create("system")) | |
{ | |
var aref = system.ActorOf(Props.Create<MyActor>(), "child"); | |
var q = (from i in aref.Query<int>() | |
where i > 3 && i < 100 | |
select i + ", "); | |
// In the example, each iteration in foreach will run, when corresponding value will be send by an actor - | |
// in this case first time after few seconds (because we're filtering on 3 as lowerbound), and then after | |
// every 1 sec. But it will never end, as we don't tell enumerable to be closed at any time | |
foreach (var i in q) | |
Console.Write(i); | |
Console.ReadLine(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment