Skip to content

Instantly share code, notes, and snippets.

@noelwelsh
Created July 29, 2016 11:28
Show Gist options
  • Save noelwelsh/657b81ccde86badadfa6b5988887a7a5 to your computer and use it in GitHub Desktop.
Save noelwelsh/657b81ccde86badadfa6b5988887a7a5 to your computer and use it in GitHub Desktop.
Pull-based event stream case study ala fs2
// Push vs pull evaluation
// Interpreters
// Reification
// Generalised algebraic data types
import java.util.concurrent.ArrayBlockingQueue
import scalaz.Applicative
import scalaz.syntax.applicative._ // for |@|
final case class Sink[A,B](source: EventStream[A], initialValue: B, f: (B, A) => B) {
def run: B = {
def loop(currentValue: B): B =
source.step match {
case Emit(a) => loop(f(currentValue,a))
case Done => currentValue
}
loop(initialValue)
}
}
sealed trait Value[+A] extends Product with Serializable {
def map[B](f: A => B): Value[B] =
this match {
case Emit(a) => Emit(f(a))
case Done => Done
}
}
object Value {
implicit object valueInstance extends Applicative[Value] {
override def ap[A, B](fa: => Value[A])(f: => Value[A => B]): Value[B] =
f match {
case Emit(f) =>
fa match {
case Emit(a) => Emit(f(a))
case Done => Done
}
case Done => Done
}
override def point[A](x: => A): Value[A] = Emit(x)
}
}
final case class Emit[A](a: A) extends Value[A]
final case object Done extends Value[Nothing]
sealed trait EventStream[A] extends Product with Serializable {
import EventStream._
def map[B](f: A => B): EventStream[B] =
Map(this, f)
def zip[B](that: EventStream[B]): EventStream[(A,B)] =
Zip(this, that)
def foldLeft[B](initialValue: B)(f: (B,A) => B): Sink[A,B] =
Sink(this, initialValue, f)
def step: Value[A]
}
object EventStream {
final case class Map[A,B](source: EventStream[A], f: A => B) extends EventStream[B] {
def step: Value[B] =
source.step.map(f)
}
final case class Zip[A,B](left: EventStream[A], right: EventStream[B]) extends EventStream[(A,B)] {
def step: Value[(A,B)] =
(left.step |@| right.step).tupled
}
final case class Pure[A](a: A) extends EventStream[A] {
def step: Value[A] =
Emit(a)
}
final case class Callback[A](register: (Value[A] => Unit) => Unit) extends EventStream[A] {
val queue = new ArrayBlockingQueue[Value[A]](1)
var registered: Boolean = false
def step: Value[A] = {
if(!registered) {
register((a: Value[A]) => queue.put(a))
registered = true
}
queue.take()
}
}
}
object EventStreamExample {
import EventStream._
def run = {
val register =
(callback: Value[Int] => Unit) => {
Driver.run(100, callback)
}
val source =
Callback(register)
source.map(a => a * 2).foldLeft(0){_+_}.run
}
def pureDerp =
Pure(1).foldLeft(0){_ + _}.run
def diamondDerp = {
val register =
(callback: Value[Int] => Unit) => {
Driver.run(10, callback)
}
val source = Pure(1) //Callback(register)
val derp = source.map(identity) zip source.map(identity)
derp.foldLeft(()){ (_, tuple) => println(tuple) }.run
}
}
object Driver {
def run(limit: Int, callback: Value[Int] => Unit): Unit = {
new Thread {
var counter = 0
override def run: Unit = {
while(counter < limit) {
println(s"Thread generating $counter")
callback(Emit(counter))
counter = counter + 1
}
callback(Done)
}
}.start()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment