Last active
November 15, 2019 00:35
-
-
Save tomwadeson/82f3e5996cf81ca0cc2e9a8eaea45f3a to your computer and use it in GitHub Desktop.
Streaming generation of arbitrary (and optionally distinct) data, with a scary-looking `toIterator` to integrate with Gatling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package newbeeper.performance | |
import cats.effect.concurrent.{MVar, Ref} | |
import cats.effect.syntax.concurrent._ | |
import cats.effect.{Concurrent, ConcurrentEffect, Sync} | |
import cats.syntax.flatMap._ | |
import cats.syntax.functor._ | |
import fs2.Stream | |
import org.scalacheck.Arbitrary | |
object Random { | |
implicit final class StreamOps[F[_], A](private val stream: Stream[F, A]) { | |
def unsafeToIterator(implicit F: ConcurrentEffect[F]): F[Iterator[A]] = | |
toIterator.map(_.map(F.toIO(_).unsafeRunSync())) | |
def toIterator(implicit F: Concurrent[F]): F[Iterator[F[A]]] = | |
for { | |
cell <- MVar[F].empty[A] | |
_ <- stream.evalMap(cell.put).compile.drain.start | |
} yield Iterator.continually(cell.take) | |
} | |
def of[F[_], A](implicit F: Sync[F], A: Arbitrary[A]): Stream[F, A] = | |
Stream.eval(F.delay(A.arbitrary.sample)).repeat.unNone | |
def distinct[F[_], A](implicit F: Sync[F], A: Arbitrary[A]): Stream[F, A] = | |
distinctBy[F, A, A](identity) | |
def distinctBy[F[_], A, B](f: A => B)(implicit F: Sync[F], A: Arbitrary[A]): Stream[F, A] = | |
for { | |
seen <- Stream.eval(Ref[F].of(Set.empty[B])) | |
filtered <- of[F, A].evalFilter(a => { val b = f(a); seen.modify(seen => (seen + b, !seen.contains(b))) }) | |
} yield filtered | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment