-
-
Save ahjohannessen/ad827c0ec0967b102c5e to your computer and use it in GitHub Desktop.
Generic Akka stream operations for carrying extra information around operations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream.scaladsl.Flow | |
import scala.concurrent.{ ExecutionContext, Future } | |
import scalaz._ | |
import scalaz.std.scalaFuture.futureInstance // IntelliJ lies | |
/** | |
* Flowz provides Akka Stream Flow like methods for monadic types. | |
* | |
* You can define the `M` type and the input type, and let the compiler infer the output | |
* type: | |
* ``` | |
* val process = Flowz[Option, Int].map(_ * 2 toString) // Outputs Flow[Option[Int], Option[String]] | |
* | |
* source ~> process ~> sink | |
* ``` | |
* | |
* @tparam M The container type | |
* @tparam A The inner type | |
*/ | |
class Flowz[M[_], A] private () { | |
/** Map the content of `M` using `f` */ | |
def map[B](f: A ⇒ B)(implicit ev: Functor[M]): Flow[M[A], M[B], Unit] = | |
Flow[M[A]].map(ev.lift(f)) | |
/** Maps asynchronously the contents of `M` using `f` */ | |
def mapAsync[B](parallelism: Int)(f: A ⇒ Future[B]) // base params | |
(implicit ev: Traverse[M], ex: ExecutionContext): Flow[M[A], M[B], Unit] = | |
Flow[M[A]].mapAsync(parallelism)(ev.traverse(_)(f)) | |
/** Maps asynchronously the contents of `M` using `f` */ | |
def mapAsyncUnordered[B](parallelism: Int)(f: A ⇒ Future[B]) // base params | |
(implicit ev: Traverse[M], ex: ExecutionContext): Flow[M[A], M[B], Unit] = | |
Flow[M[A]].mapAsyncUnordered(parallelism)(ev.traverse(_)(f)) | |
/** Given a value and a function both in an `M` container, applies the function to the value */ | |
def ap[B](f: ⇒ M[A ⇒ B])(implicit ev: Apply[M]): Flow[M[A], M[B], Unit] = | |
Flow[M[A]].map(ma ⇒ ev.ap(ma)(f)) | |
/** Given a value, creates a containing `M` containing that value */ | |
def point(implicit ev: Applicative[M]): Flow[A, M[A], Unit] = | |
Flow[A].map(a ⇒ ev.point(a)) | |
/** Flat maps `f` on the value */ | |
def bind[B](f: A ⇒ M[B])(implicit ev: Bind[M]): Flow[M[A], M[B], Unit] = | |
Flow[M[A]].map(ma ⇒ ev.bind(ma)(f)) | |
/** Maps `f` on the value and returns `B` as the output stream (loosing the `M` container) */ | |
def foldMap[B](f: A ⇒ B)(implicit ev: Foldable[M], F: Monoid[B]): Flow[M[A], B, Unit] = | |
Flow[M[A]].map(ma ⇒ ev.foldMap(ma)(f)) | |
/** Folds left over `M` and returns the accumulator `B` */ | |
def foldLeft[B](z: B)(f: (B, A) ⇒ B)(implicit ev: Foldable[M]): Flow[M[A], B, Unit] = | |
Flow[M[A]].map(ma ⇒ ev.foldLeft(ma, z)(f)) | |
/** Folds right over `M` and returns the accumulator `B` */ | |
def foldRight[B](z: B)(f: (A, ⇒ B) ⇒ B)(implicit ev: Foldable[M]): Flow[M[A], B, Unit] = | |
Flow[M[A]].map(ma ⇒ ev.foldRight(ma, z)(f)) | |
/** | |
* Applies `f` on the value which returns `G[B]`, and then swaps the `M` & `G` around to return | |
* a stream of `G[M[B]]` | |
*/ | |
def traverse[G[_], B](f: A ⇒ G[B])(implicit ev: Traverse[M], G: Applicative[G]): Flow[M[A], G[M[B]], Unit] = // scalastyle:ignore | |
Flow[M[A]].map(ma ⇒ ev.traverse(ma)(f)) | |
/** Given a stream of `M[G[B]]` will swap the `M` & `G` over to return a stream of `G[M[B]]` */ | |
def sequence[G[_]](implicit ev: Traverse[M], G: Applicative[G]): Flow[M[G[A]], G[M[A]], Unit] = | |
Flow[M[G[A]]].map(mga ⇒ ev.sequence(mga)) | |
// <editor-fold desc="aliases"> | |
/** Alias for [[bind]] */ | |
@inline | |
def flatMap[B](f: A ⇒ M[B])(implicit ev: Bind[M]): Flow[M[A], M[B], Unit] = bind(f) | |
// </editor-fold> | |
} | |
object Flowz { | |
/** Creates a [[Flowz]] instance for the type `M[A]` */ | |
def apply[M[_], A]: Flowz[M, A] = new Flowz[M, A]() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment