Skip to content

Instantly share code, notes, and snippets.

@iravid
Created September 5, 2018 17:59
Show Gist options
  • Save iravid/be1d89e3e7108e779b87d8000167a2e0 to your computer and use it in GitHub Desktop.
Save iravid/be1d89e3e7108e779b87d8000167a2e0 to your computer and use it in GitHub Desktop.
def batch[F[_]: Concurrent, T, U](f: T => U, bound: Int)(implicit U: Monoid[U]): Pipe[F, T, U] =
input =>
Stream.eval(Queue.bounded[F, U](1)).flatMap { queue =>
val combiner = input.evalMapAccumulate((U.empty, 0)) {
case ((acc, currBound), t) =>
val u = f(t)
queue.offer1(u).flatMap { stored =>
if (stored) ((U.empty, 0), ()).pure[F]
else {
if (currBound < bound) ((U.combine(acc, u), currBound + 1), ()).pure[F]
else queue.enqueue1(acc) >> ((u, 1), ()).pure[F]
}
}
}
combiner.drain.concurrently(queue.dequeue)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment