Created
May 25, 2019 05:26
-
-
Save OnurGumus/016118ebd227db8fb7892f1ee9d6477a to your computer and use it in GitHub Desktop.
Some dummy prototype on how akka streams can drop elements
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Akka; | |
using Akka.Actor; | |
using Akka.Configuration; | |
using Akka.Streams; | |
using Akka.Streams.Dsl; | |
using Akka.Streams.Implementation; | |
using Akka.Streams.Implementation.Stages; | |
using Akka.Streams.Stage; | |
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace ConsoleApp87 | |
{ | |
public class ZipWithO<TIn0, TIn1, TOut> : GraphStage<FanInShape<TIn0, TIn1, TOut>> | |
{ | |
private sealed class Logic : OutGraphStageLogic | |
{ | |
private readonly ZipWithO<TIn0, TIn1, TOut> _stage; | |
// Without this field the completion signaling would take one extra pull | |
private bool _willShutDown; | |
private int _pending; | |
public Logic(Shape shape, ZipWithO<TIn0, TIn1, TOut> stage) : base(shape) | |
{ | |
_stage = stage; | |
SetHandler(_stage.In0, onPush: () => | |
{ | |
Pull(_stage.In1); | |
_pending--; | |
if (_pending == 0) PushAll(); | |
}, | |
onUpstreamFinish: () => | |
{ | |
if (!IsAvailable(_stage.In0)) CompleteStage(); | |
_willShutDown = true; | |
}); | |
SetHandler(_stage.In1, onPush: () => | |
{ | |
_pending--; | |
if (_pending == 0) PushAll(); | |
}, | |
onUpstreamFinish: () => | |
{ | |
if (!IsAvailable(_stage.In1)) CompleteStage(); | |
_willShutDown = true; | |
}); | |
SetHandler(_stage.Out, this); | |
} | |
public override void OnPull() | |
{ | |
_pending += _stage.Shape.Inlets.Length; | |
if (_pending == 0) PushAll(); | |
} | |
private void PushAll() | |
{ | |
Push(_stage.Out, _stage.Zipper(Grab(_stage.In0), Grab(_stage.In1))); | |
if (_willShutDown) CompleteStage(); | |
else | |
{ | |
Pull(_stage.In0); | |
} | |
} | |
public override void PreStart() | |
{ | |
Pull(_stage.In0); | |
// Pull(_stage.In1); | |
} | |
public override string ToString() => "ZipWith2"; | |
} | |
public ZipWithO(Func<TIn0, TIn1, TOut> zipper) | |
{ | |
Zipper = zipper; | |
InitialAttributes = Attributes.CreateName("ZipWith"); | |
Shape = new FanInShape<TIn0, TIn1, TOut>("ZipWith"); | |
Out = Shape.Out; | |
In0 = Shape.In0; | |
In1 = Shape.In1; | |
} | |
public Outlet<TOut> Out { get; } | |
public Inlet<TIn0> In0 { get; } | |
public Inlet<TIn1> In1 { get; } | |
protected sealed override Attributes InitialAttributes { get; } | |
public sealed override FanInShape<TIn0, TIn1, TOut> Shape { get; } | |
public Func<TIn0, TIn1, TOut> Zipper { get; } | |
protected sealed override GraphStageLogic CreateLogic(Attributes inheritedAttributes) | |
{ | |
return new Logic(Shape, this); | |
} | |
} | |
class Map2<TIn> : GraphStage<FlowShape<TIn, TIn>> | |
{ | |
private sealed class Logic : GraphStageLogic | |
{ | |
bool demand = false; | |
bool firstHit = true; | |
Map2<TIn> map; | |
public Logic(Map2<TIn> map) : base(map.Shape) | |
{ | |
this.map = map; | |
SetHandler(map.In, onPush: () => | |
{ | |
if (demand) | |
{ | |
Push(map.Out, Grab(map.In)); demand = false; | |
Pull(map.In); | |
firstHit = true; | |
} | |
else | |
{ | |
Pull(map.In); | |
} | |
}); | |
SetHandler(map.Out, onPull: () => | |
{ | |
demand = true; | |
if (!firstHit) | |
{ | |
Pull(map.In); | |
} | |
else | |
firstHit = false; | |
}); | |
} | |
public override void PreStart() | |
{ | |
Pull(map.In); | |
} | |
} | |
public Map2() | |
{ | |
Shape = new FlowShape<TIn, TIn>(In, Out); | |
} | |
public Inlet<TIn> In { get; } = new Inlet<TIn>("Map2.in"); | |
public Outlet<TIn> Out { get; } = new Outlet<TIn>("Map2.out"); | |
public override FlowShape<TIn, TIn> Shape { get; } | |
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); | |
} | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var s = @"akka { | |
loglevel = DEBUG | |
stream.materializer.max-input-buffer-size = 1 | |
} "; | |
var c = ConfigurationFactory.ParseString(s); | |
var sys = ActorSystem.Create("test", c); | |
var m = sys.Materializer(); | |
// Loop(m); | |
//Trugger(m); | |
Take(m); | |
Console.ReadKey(); | |
return; | |
} | |
public static void Loop(ActorMaterializer m) | |
{ | |
RunnableGraph.FromGraph(GraphDsl.Create(Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), 1).Log("Tick"), (b,source) => | |
{ | |
var zip = b.Add(ZipWith.Apply<int, int, int>(Keep.Right)); | |
var broadcast = b.Add(new Broadcast<int>(2)); | |
var concat = b.Add(new Concat<int, int>(2).Async()); | |
var start = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), 1).Log("start").Take(1); | |
var print = Flow.Create<int>().Select(s => | |
{ | |
Console.WriteLine(s); | |
return s; | |
}); | |
var sink = Sink.Ignore<int>().MapMaterializedValue(_ => NotUsed.Instance); | |
b.From(source).To(zip.In0); | |
b.From(zip.Out).Via(print).Via(broadcast).To(sink); | |
// b.From(start); | |
b.From(start).Via(concat).To(zip.In1); | |
b.From(broadcast).To(concat); | |
return ClosedShape.Instance; | |
})).Run(m); | |
} | |
private static void Trugger(ActorMaterializer m) | |
{ | |
var s1 = Source.ActorRef<int>(1, OverflowStrategy.DropBuffer).Log("before s1"); | |
var s2 = Source.ActorRef<int>(1, OverflowStrategy.DropBuffer).Log("before s2"); | |
// .Select(element => | |
//{ | |
// Console.WriteLine("s2-" + element); | |
// return element; | |
//}); | |
var z2 = new ZipWithO<int, int, (int, int)>((ii, jj) => (ii, jj)); | |
var aggregateFlow = Flow.FromGraph(GraphDsl.Create(Sink.Aggregate<int, int>(0, (sum, ij) => sum + ij), (b, aggregate) => | |
{ | |
var outlet = b.From(b.MaterializedValue) | |
.Via(Flow.Create<Task<int>>().SelectAsync(4, x => x)) | |
.Out; | |
return new FlowShape<int, int>(aggregate.Inlet, outlet); | |
})); | |
var (a1, a2) = RunnableGraph.FromGraph(GraphDsl.Create(s1, s2, (a, b) => (a, b), (builder, sourceShape1, sourceShape2) => | |
{ | |
//var zipper = builder.Add(ZipWith.Apply<int, int, (int, int)>((ii, jj) => (ii, jj)).WithAttributes(Attributes.CreateInputBuffer(0, 0))); | |
var zipper = builder.Add(z2); | |
builder.From(sourceShape1).To(zipper.In0); | |
var dropper = new Map2<int>(); | |
var dropperz = Flow.FromGraph(dropper); | |
builder.From(sourceShape2).Via(dropperz).To(zipper.In1); | |
builder.From(zipper.Out).To(Sink.ForEach<(int, int)>(j => Console.WriteLine($"test: {j}"))); | |
// Exposing exactly one output port | |
return ClosedShape.Instance; | |
})).Run(m); | |
Thread.Sleep(1000); | |
int i = 0; | |
while (true) | |
{ | |
i++; | |
Thread.Sleep(1000); | |
a2.Tell(i); | |
if (i % 5 == 0) | |
{ | |
a1.Tell(i); | |
} | |
} | |
} | |
public static void Take(ActorMaterializer m) | |
{ | |
var canContiue = true; | |
var source = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), 1) | |
.TakeWhile(x => canContiue) | |
.TakeWithin(TimeSpan.FromSeconds(1000)) | |
.Where(x => x == 1); | |
var sink = Sink.ForEach<int>(x => Console.WriteLine(x)); | |
source.ToMaterialized(sink, Keep.Right).Run(m); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment