Created
May 8, 2017 11:07
-
-
Save 0xYUANTI/c4dbface673e1a1dc9524f355631bb71 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fs2.interop.scalaz._ | |
import fs2.{Scheduler, Strategy, time} | |
import scala.concurrent.duration._ | |
import scalaz.concurrent.{Task => ZTask} | |
// porting some code from scalaz.concurrent.chan to fs2... | |
// we need to call a few external rest apis every N seconds and process the results. | |
object App { | |
def main(args: Array[String]): Unit = { | |
val prod1 = everyMinute evalMap (_ => producer1) | |
val prod2 = everyMinute evalMap (_ => producer2) | |
val prod3 = everyMinute evalMap (_ => producer3) | |
val stream = prod1 merge prod2 merge prod3 map processEvent | |
// main loop | |
stream.run.unsafePerformSync | |
} | |
def processEvent: Event => Unit = println | |
implicit val strategy = Strategy fromFixedDaemonPool 2 | |
implicit val scheduler = Scheduler fromFixedDaemonPool 2 | |
def everyMinute = time.awakeEvery[ZTask](10.seconds) | |
def producer1: ZTask[Event] = ZTask { println(java.lang.Thread.currentThread); A() } | |
def producer2: ZTask[Event] = ZTask { println(java.lang.Thread.currentThread); B() } | |
def producer3: ZTask[Event] = ZTask { println(java.lang.Thread.currentThread); C() } | |
sealed trait Event | |
final case class A() extends Event | |
final case class B() extends Event | |
final case class C() extends Event | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment