Skip to content

Instantly share code, notes, and snippets.

@danslapman
Created June 26, 2019 11:30
Show Gist options
  • Save danslapman/4207d0c4794c6c65d3ae148f05672498 to your computer and use it in GitHub Desktop.
Save danslapman/4207d0c4794c6c65d3ae148f05672498 to your computer and use it in GitHub Desktop.
Acked replicateN
import scala.concurrent.{ExecutionContext, Future, Promise}
import cats.instances.future._
import cats.syntax.functor._
import com.timcharper.acked.AckTup
import shapeless.{Nat, Sized}
import shapeless.syntax.sized._
import shapeless.ops.nat.ToInt
object AckedReplicate {
def replicateN[T, N <: Nat](implicit size: ToInt[N]): AckTup[T] => Sized[List[AckTup[T]], N] = {
at: AckTup[T] =>
implicit val ec: ExecutionContext = SameThreadExecutionContext
val (p, t) = at
val promises: List[Promise[Unit]] = List.fill(size())(Promise[Unit])
val sizedPs: Sized[List[Promise[Unit]], N] = promises.sized[N].get // I know what I do, scalac
val futures = promises.map(_.future)
Future.sequence(futures).void.onComplete(p.tryComplete)
sizedPs.map(_ -> t)
}
}
import shapeless.nat.{_2, _3, _4, _6}
val bc = builder.add(UnzipWith(AckedReplicate.replicateN[XXX, _6].andThen(_.tupled)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment