Skip to content

Instantly share code, notes, and snippets.

@OnurGumus
Created May 25, 2019 05:26
Show Gist options
  • Save OnurGumus/016118ebd227db8fb7892f1ee9d6477a to your computer and use it in GitHub Desktop.
Save OnurGumus/016118ebd227db8fb7892f1ee9d6477a to your computer and use it in GitHub Desktop.
Some dummy prototype on how akka streams can drop elements
using Akka;
using Akka.Actor;
using Akka.Configuration;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation;
using Akka.Streams.Implementation.Stages;
using Akka.Streams.Stage;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp87
{
public class ZipWithO<TIn0, TIn1, TOut> : GraphStage<FanInShape<TIn0, TIn1, TOut>>
{
private sealed class Logic : OutGraphStageLogic
{
private readonly ZipWithO<TIn0, TIn1, TOut> _stage;
// Without this field the completion signaling would take one extra pull
private bool _willShutDown;
private int _pending;
public Logic(Shape shape, ZipWithO<TIn0, TIn1, TOut> stage) : base(shape)
{
_stage = stage;
SetHandler(_stage.In0, onPush: () =>
{
Pull(_stage.In1);
_pending--;
if (_pending == 0) PushAll();
},
onUpstreamFinish: () =>
{
if (!IsAvailable(_stage.In0)) CompleteStage();
_willShutDown = true;
});
SetHandler(_stage.In1, onPush: () =>
{
_pending--;
if (_pending == 0) PushAll();
},
onUpstreamFinish: () =>
{
if (!IsAvailable(_stage.In1)) CompleteStage();
_willShutDown = true;
});
SetHandler(_stage.Out, this);
}
public override void OnPull()
{
_pending += _stage.Shape.Inlets.Length;
if (_pending == 0) PushAll();
}
private void PushAll()
{
Push(_stage.Out, _stage.Zipper(Grab(_stage.In0), Grab(_stage.In1)));
if (_willShutDown) CompleteStage();
else
{
Pull(_stage.In0);
}
}
public override void PreStart()
{
Pull(_stage.In0);
// Pull(_stage.In1);
}
public override string ToString() => "ZipWith2";
}
public ZipWithO(Func<TIn0, TIn1, TOut> zipper)
{
Zipper = zipper;
InitialAttributes = Attributes.CreateName("ZipWith");
Shape = new FanInShape<TIn0, TIn1, TOut>("ZipWith");
Out = Shape.Out;
In0 = Shape.In0;
In1 = Shape.In1;
}
public Outlet<TOut> Out { get; }
public Inlet<TIn0> In0 { get; }
public Inlet<TIn1> In1 { get; }
protected sealed override Attributes InitialAttributes { get; }
public sealed override FanInShape<TIn0, TIn1, TOut> Shape { get; }
public Func<TIn0, TIn1, TOut> Zipper { get; }
protected sealed override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
{
return new Logic(Shape, this);
}
}
class Map2<TIn> : GraphStage<FlowShape<TIn, TIn>>
{
private sealed class Logic : GraphStageLogic
{
bool demand = false;
bool firstHit = true;
Map2<TIn> map;
public Logic(Map2<TIn> map) : base(map.Shape)
{
this.map = map;
SetHandler(map.In, onPush: () =>
{
if (demand)
{
Push(map.Out, Grab(map.In)); demand = false;
Pull(map.In);
firstHit = true;
}
else
{
Pull(map.In);
}
});
SetHandler(map.Out, onPull: () =>
{
demand = true;
if (!firstHit)
{
Pull(map.In);
}
else
firstHit = false;
});
}
public override void PreStart()
{
Pull(map.In);
}
}
public Map2()
{
Shape = new FlowShape<TIn, TIn>(In, Out);
}
public Inlet<TIn> In { get; } = new Inlet<TIn>("Map2.in");
public Outlet<TIn> Out { get; } = new Outlet<TIn>("Map2.out");
public override FlowShape<TIn, TIn> Shape { get; }
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);
}
class Program
{
static void Main(string[] args)
{
var s = @"akka {
loglevel = DEBUG
stream.materializer.max-input-buffer-size = 1
} ";
var c = ConfigurationFactory.ParseString(s);
var sys = ActorSystem.Create("test", c);
var m = sys.Materializer();
// Loop(m);
//Trugger(m);
Take(m);
Console.ReadKey();
return;
}
public static void Loop(ActorMaterializer m)
{
RunnableGraph.FromGraph(GraphDsl.Create(Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), 1).Log("Tick"), (b,source) =>
{
var zip = b.Add(ZipWith.Apply<int, int, int>(Keep.Right));
var broadcast = b.Add(new Broadcast<int>(2));
var concat = b.Add(new Concat<int, int>(2).Async());
var start = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), 1).Log("start").Take(1);
var print = Flow.Create<int>().Select(s =>
{
Console.WriteLine(s);
return s;
});
var sink = Sink.Ignore<int>().MapMaterializedValue(_ => NotUsed.Instance);
b.From(source).To(zip.In0);
b.From(zip.Out).Via(print).Via(broadcast).To(sink);
// b.From(start);
b.From(start).Via(concat).To(zip.In1);
b.From(broadcast).To(concat);
return ClosedShape.Instance;
})).Run(m);
}
private static void Trugger(ActorMaterializer m)
{
var s1 = Source.ActorRef<int>(1, OverflowStrategy.DropBuffer).Log("before s1");
var s2 = Source.ActorRef<int>(1, OverflowStrategy.DropBuffer).Log("before s2");
// .Select(element =>
//{
// Console.WriteLine("s2-" + element);
// return element;
//});
var z2 = new ZipWithO<int, int, (int, int)>((ii, jj) => (ii, jj));
var aggregateFlow = Flow.FromGraph(GraphDsl.Create(Sink.Aggregate<int, int>(0, (sum, ij) => sum + ij), (b, aggregate) =>
{
var outlet = b.From(b.MaterializedValue)
.Via(Flow.Create<Task<int>>().SelectAsync(4, x => x))
.Out;
return new FlowShape<int, int>(aggregate.Inlet, outlet);
}));
var (a1, a2) = RunnableGraph.FromGraph(GraphDsl.Create(s1, s2, (a, b) => (a, b), (builder, sourceShape1, sourceShape2) =>
{
//var zipper = builder.Add(ZipWith.Apply<int, int, (int, int)>((ii, jj) => (ii, jj)).WithAttributes(Attributes.CreateInputBuffer(0, 0)));
var zipper = builder.Add(z2);
builder.From(sourceShape1).To(zipper.In0);
var dropper = new Map2<int>();
var dropperz = Flow.FromGraph(dropper);
builder.From(sourceShape2).Via(dropperz).To(zipper.In1);
builder.From(zipper.Out).To(Sink.ForEach<(int, int)>(j => Console.WriteLine($"test: {j}")));
// Exposing exactly one output port
return ClosedShape.Instance;
})).Run(m);
Thread.Sleep(1000);
int i = 0;
while (true)
{
i++;
Thread.Sleep(1000);
a2.Tell(i);
if (i % 5 == 0)
{
a1.Tell(i);
}
}
}
public static void Take(ActorMaterializer m)
{
var canContiue = true;
var source = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), 1)
.TakeWhile(x => canContiue)
.TakeWithin(TimeSpan.FromSeconds(1000))
.Where(x => x == 1);
var sink = Sink.ForEach<int>(x => Console.WriteLine(x));
source.ToMaterialized(sink, Keep.Right).Run(m);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment