Skip to content

Instantly share code, notes, and snippets.

@OnurGumus
Created December 17, 2019 11:28
Show Gist options
  • Save OnurGumus/adaa3824f2c3ff3b16416cf060500fb4 to your computer and use it in GitHub Desktop.
Save OnurGumus/adaa3824f2c3ff3b16416cf060500fb4 to your computer and use it in GitHub Desktop.
akkling-streams
let foreachAsync parallelCount action =
let asyncF =
fun input ->
async {
do! action input
return Akka.NotUsed.Instance
}
Flow.empty
|> Flow.asyncMap parallelCount asyncF
|> Flow.toMat Sink.ignore Keep.right
let graphFlow b sink =
graph b {
//block elements
let! source = (Source.ofSeq rows).Log("Source")
let! broadcast = Broadcast 2
let! zip = ZipWith.create combineResults
let! someFlow = (Flow.empty |> Flow.asyncMap 10 someFlow).Log("someFlow")
let! anotherFlow = ..
let! flow3 = ...
let! beforeSink = (Flow.empty
|> Flow.groupedWithin 40
(TimeSpan.FromSeconds 10.0)).Log("Before sink")
//glue the elements
b.From broadcast =>> someFlow =>> anotherFlow =>> zip.In0 |> ignore
b.From broadcast =>> flow3 =>> zip.In1 |> ignore
b.From source =>> broadcast.In |> ignore
b.From zip.Out =>> beforeSink =>> sink|> ignore
}
let sink = foreachAsync 1 writeFinalResult
sink
|> Graph.create1 graphFlow
|> Graph.runnable
|> Graph.run mat
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment