Last active
January 4, 2016 22:19
-
-
Save oxbowlakes/8686567 to your computer and use it in GitHub Desktop.
For FPX 2014
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//requires scala 2.10.x | |
//requires scalaz-2.10-7.0.x | |
package fpx | |
// Furnace types (in reality, these come from a Java API and are real!) | |
trait FurnaceService { | |
def createSession(user: String, password: String): FurnaceSession | |
} | |
trait FurnaceEvent { | |
def id: Long | |
def txn: Long | |
def ccy: java.util.Currency | |
def withFx(d: Double) = new FurnaceEventBuilder | |
} | |
import java.util.Date | |
import scala.collection.generic.CanBuildFrom | |
trait FurnaceSession { | |
def replay(d: Date): Iterator[FurnaceEvent] | |
def insert(b: FurnaceEventBuilder): scala.concurrent.Future[Long] //simplification | |
def close() | |
} | |
class FurnaceEventBuilder | |
//Now for the goodies | |
import scala.concurrent.Promise | |
import scala.concurrent.Future | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.duration.Duration | |
import scala.concurrent.Await | |
import scalaz._ | |
import Scalaz._ | |
import Free._ | |
trait Furnace[+A] { | |
import Furnace._ | |
def map[B](f: A => B): Furnace[B] = furnace(rw => apply(rw) map { case (nw, a) => (nw, f(a))}) | |
def flatMap[B](f: A => Furnace[B]): Furnace[B] = furnace(rw => apply(rw) flatMap { case (nw, a) => f(a)(nw)}) | |
protected def apply(rw: FurnaceSession): Trampoline[(FurnaceSession, A)] | |
} | |
trait FurnaceFunctions { | |
def furnace[A](f: FurnaceSession => Trampoline[(FurnaceSession, A)]) | |
= new Furnace[A] { | |
protected def apply(rw: FurnaceSession) = Suspend(() => f(rw)) | |
} | |
} | |
object Furnace extends FurnaceFunctions{ module => | |
def unsafePerformIO[A](f: Furnace[A]): A = { | |
val s = new FurnaceService { | |
def createSession(user: String, password: String) = new FurnaceSession { | |
def replay(d: Date) = Iterator.empty | |
def insert(b: FurnaceEventBuilder) = Promise.successful(1L).future | |
def close() = () | |
} | |
} | |
val t = s.createSession("foo", "bar") | |
try f.apply(t).run._2 finally t.close() | |
} | |
def eitherT[A](f: FurnaceSession => A): EitherT[Furnace, Throwable, A] = EitherT(apply(fs => \/.fromTryCatch(f(fs)))) | |
def apply[A](f: FurnaceSession => A): Furnace[A] = furnace(rw => return_(rw -> f(rw))) | |
def point[A](a: => A) = apply(_ => a) | |
implicit val FurnaceInstances = new Monad[Furnace] { | |
def point[A](a: => A) = module.point(a) | |
def bind[A, B](fa: Furnace[A])(f: (A) => Furnace[B]) = fa flatMap f | |
} | |
} | |
//Our DSL for interacting with Furnace | |
object dsl { | |
def dateQuery(d: Date): FurnaceSession => List[FurnaceEvent] = fs => { | |
fs.replay(d).toList | |
} | |
def insert(b: FurnaceEventBuilder): FurnaceSession => Future[Long] = fs => fs.insert(b) | |
implicit class FurnaceEventsW(f: Furnace[List[FurnaceEvent]]) { | |
def latest(excludeDeleted: Boolean = true): Furnace[Map[Long, FurnaceEvent]] = f map { list => | |
list.foldLeft(Map.empty[Long, FurnaceEvent]) { case (m, ev) => m + (ev.txn -> ev)} | |
} | |
} | |
final class ListOfEventFunctorOps[F[_], A](self: F[A], F: Functor[F]) { | |
def latestF(excludeDeleted: Boolean = true)(implicit A: A =:= List[FurnaceEvent]): F[Map[Long, FurnaceEvent]] = F.map(self){ list => | |
A(list).foldLeft(Map.empty[Long, FurnaceEvent]) { case (m, ev) => m + (ev.txn -> ev)} | |
} | |
} | |
implicit def listOfEventFunctorOps[FA](fa: FA)(implicit F: Unapply[Functor, FA]) = new ListOfEventFunctorOps[F.M, F.A](F(fa), F.TC) | |
} | |
//Futures typeclass instances | |
object fewchaz { | |
object Implicits { | |
class FutureInstance(implicit EC: ExecutionContext) extends Monad[Future] with Traverse[Future] { | |
def point[A](a: => A) = Promise.successful(a).future | |
override def map[A, B](fa: Future[A])(f: A => B) = fa map f | |
def bind[A, B](fa: Future[A])(f: A => Future[B]) = fa flatMap f | |
def traverseImpl[G[_], A, B](fa: Future[A])(f: A => G[B]) | |
(implicit A: Applicative[G]): G[Future[B]] | |
= A.map(f(Await.result(fa, Duration.Inf)))(b => point(b)) | |
} | |
object parallel { | |
implicit def FutureInstances(implicit EC: ExecutionContext) = new FutureInstance { | |
override def ap[A, B](fa: => Future[A])(fab: => Future[A => B]) | |
= (fa zip fab) map { case (a, f) => f(a) } | |
} | |
} | |
object sequential { | |
implicit def FutureInstances(implicit EC: ExecutionContext) = new FutureInstance | |
} | |
} | |
final class SequencedFunctorOps[F[_], A](self: F[A], F: Functor[F]) { | |
def sequencedF[B, M[_] <: TraversableOnce[_]](implicit EC: ExecutionContext, A: A => M[Future[B]], cbf: CanBuildFrom[M[Future[B]], B, M[B]]): F[Future[M[B]]] = F.map(self)(a => Future.sequence(A(a))) | |
} | |
implicit def sequencedFunctorOps[FA](fa: FA)(implicit F: Unapply[Functor, FA]) = new SequencedFunctorOps[F.M, F.A](F(fa), F.TC) | |
} | |
//Example usage of DSL | |
class MissingFxException extends Exception | |
import dsl._ | |
import fewchaz._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
trait Pipelines { | |
type CorrectedEvents = Future[Stream[Long]] | |
val correctFxRates = Map.empty[java.util.Currency, Double] | |
def suspiciousFxRates(xs: Iterable[FurnaceEvent]) = Set.empty[java.util.Currency] | |
} | |
// Example 1 - query/update pipeline | |
object Pipeline1 extends Pipelines { | |
import fewchaz.Implicits.sequential._ | |
def correct_(xs: Iterable[FurnaceEvent], rates: java.util.Currency => Double): Furnace[CorrectedEvents] = xs.toStream.traverseU(e => Furnace(insert(e.withFx(rates(e.ccy))))).map(_.sequenceU) | |
val pipeline: Furnace[CorrectedEvents] = | |
for { | |
x <- Furnace(dateQuery(new Date)).latest() //x is Map[Long, FurnaceEvent] | |
y <- suspiciousFxRates(x.values).point[Furnace] | |
z <- if (y forall correctFxRates.keySet) | |
correct_(x.values filter (correctFxRates.keySet contains _.ccy), correctFxRates) | |
else | |
Promise.failed(new MissingFxException).future.point[Furnace] | |
} | |
yield z | |
import scala.concurrent.duration._ | |
Await.result(Furnace.unsafePerformIO(pipeline), atMost = 5.minutes) | |
} | |
// Example 2 - query/update pipeline with error handling | |
object Pipeline2 extends Pipelines { | |
import fewchaz.Implicits.sequential._ | |
def suspiciousFxRates2(xs: Iterable[FurnaceEvent]): Throwable \/ Set[java.util.Currency] = Set.empty[java.util.Currency].right | |
def correct_(xs: Iterable[FurnaceEvent], rates: java.util.Currency => Double): EitherT[Furnace, Throwable, CorrectedEvents] = { | |
xs.toStream.traverseU(e => Furnace.eitherT(insert(e.withFx(rates(e.ccy))))).map(_.sequenceU) | |
} | |
val pipeline: EitherT[Furnace, Throwable, CorrectedEvents] = | |
for { | |
x <- Furnace.eitherT(dateQuery(new Date)) | |
y <- EitherT.right(x.pure[Furnace].latest()) | |
z <- EitherT(suspiciousFxRates2(y.values).pure[Furnace]) | |
a <- if (z forall correctFxRates.keySet) | |
correct_(y.values filter (correctFxRates.keySet contains _.ccy), correctFxRates) | |
else | |
EitherT.left((new MissingFxException).pure[Furnace]) | |
} | |
yield a | |
Furnace.unsafePerformIO(pipeline.run) | |
} | |
// Example 3 - query/update pipeline with error handling and functor abstraction | |
object Pipeline3 extends Pipelines { | |
import fewchaz.Implicits.sequential._ | |
def suspiciousFxRates3(xs: Iterable[FurnaceEvent]): Throwable \/ Set[java.util.Currency] = Set.empty[java.util.Currency].right | |
def correct_(xs: Iterable[FurnaceEvent], rates: java.util.Currency => Double): EitherT[Furnace, Throwable, CorrectedEvents] = { | |
xs.toStream.traverseU(e => Furnace.eitherT(insert(e.withFx(rates(e.ccy))))).map(_.sequenceU) | |
} | |
val pipeline: EitherT[Furnace, Throwable, CorrectedEvents] = | |
for { | |
x <- Furnace.eitherT(dateQuery(new Date)).latestF() | |
y <- EitherT(suspiciousFxRates3(x.values).pure[Furnace]) | |
z <- if (y forall correctFxRates.keySet) | |
correct_(x.values filter (correctFxRates.keySet contains _.ccy), correctFxRates) | |
else | |
EitherT.left((new MissingFxException).pure[Furnace]) | |
} | |
yield z | |
Furnace.unsafePerformIO(pipeline.run) | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment