Created
July 29, 2016 11:28
-
-
Save noelwelsh/657b81ccde86badadfa6b5988887a7a5 to your computer and use it in GitHub Desktop.
Pull-based event stream case study ala fs2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Push vs pull evaluation | |
// Interpreters | |
// Reification | |
// Generalised algebraic data types | |
import java.util.concurrent.ArrayBlockingQueue | |
import scalaz.Applicative | |
import scalaz.syntax.applicative._ // for |@| | |
final case class Sink[A,B](source: EventStream[A], initialValue: B, f: (B, A) => B) { | |
def run: B = { | |
def loop(currentValue: B): B = | |
source.step match { | |
case Emit(a) => loop(f(currentValue,a)) | |
case Done => currentValue | |
} | |
loop(initialValue) | |
} | |
} | |
sealed trait Value[+A] extends Product with Serializable { | |
def map[B](f: A => B): Value[B] = | |
this match { | |
case Emit(a) => Emit(f(a)) | |
case Done => Done | |
} | |
} | |
object Value { | |
implicit object valueInstance extends Applicative[Value] { | |
override def ap[A, B](fa: => Value[A])(f: => Value[A => B]): Value[B] = | |
f match { | |
case Emit(f) => | |
fa match { | |
case Emit(a) => Emit(f(a)) | |
case Done => Done | |
} | |
case Done => Done | |
} | |
override def point[A](x: => A): Value[A] = Emit(x) | |
} | |
} | |
final case class Emit[A](a: A) extends Value[A] | |
final case object Done extends Value[Nothing] | |
sealed trait EventStream[A] extends Product with Serializable { | |
import EventStream._ | |
def map[B](f: A => B): EventStream[B] = | |
Map(this, f) | |
def zip[B](that: EventStream[B]): EventStream[(A,B)] = | |
Zip(this, that) | |
def foldLeft[B](initialValue: B)(f: (B,A) => B): Sink[A,B] = | |
Sink(this, initialValue, f) | |
def step: Value[A] | |
} | |
object EventStream { | |
final case class Map[A,B](source: EventStream[A], f: A => B) extends EventStream[B] { | |
def step: Value[B] = | |
source.step.map(f) | |
} | |
final case class Zip[A,B](left: EventStream[A], right: EventStream[B]) extends EventStream[(A,B)] { | |
def step: Value[(A,B)] = | |
(left.step |@| right.step).tupled | |
} | |
final case class Pure[A](a: A) extends EventStream[A] { | |
def step: Value[A] = | |
Emit(a) | |
} | |
final case class Callback[A](register: (Value[A] => Unit) => Unit) extends EventStream[A] { | |
val queue = new ArrayBlockingQueue[Value[A]](1) | |
var registered: Boolean = false | |
def step: Value[A] = { | |
if(!registered) { | |
register((a: Value[A]) => queue.put(a)) | |
registered = true | |
} | |
queue.take() | |
} | |
} | |
} | |
object EventStreamExample { | |
import EventStream._ | |
def run = { | |
val register = | |
(callback: Value[Int] => Unit) => { | |
Driver.run(100, callback) | |
} | |
val source = | |
Callback(register) | |
source.map(a => a * 2).foldLeft(0){_+_}.run | |
} | |
def pureDerp = | |
Pure(1).foldLeft(0){_ + _}.run | |
def diamondDerp = { | |
val register = | |
(callback: Value[Int] => Unit) => { | |
Driver.run(10, callback) | |
} | |
val source = Pure(1) //Callback(register) | |
val derp = source.map(identity) zip source.map(identity) | |
derp.foldLeft(()){ (_, tuple) => println(tuple) }.run | |
} | |
} | |
object Driver { | |
def run(limit: Int, callback: Value[Int] => Unit): Unit = { | |
new Thread { | |
var counter = 0 | |
override def run: Unit = { | |
while(counter < limit) { | |
println(s"Thread generating $counter") | |
callback(Emit(counter)) | |
counter = counter + 1 | |
} | |
callback(Done) | |
} | |
}.start() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment