Skip to content

Instantly share code, notes, and snippets.

@oxbowlakes
Last active January 4, 2016 22:19
Show Gist options
  • Save oxbowlakes/8686567 to your computer and use it in GitHub Desktop.
Save oxbowlakes/8686567 to your computer and use it in GitHub Desktop.
For FPX 2014
//requires scala 2.10.x
//requires scalaz-2.10-7.0.x
package fpx
// Furnace types (in reality, these come from a Java API and are real!)
trait FurnaceService {
def createSession(user: String, password: String): FurnaceSession
}
trait FurnaceEvent {
def id: Long
def txn: Long
def ccy: java.util.Currency
def withFx(d: Double) = new FurnaceEventBuilder
}
import java.util.Date
import scala.collection.generic.CanBuildFrom
trait FurnaceSession {
def replay(d: Date): Iterator[FurnaceEvent]
def insert(b: FurnaceEventBuilder): scala.concurrent.Future[Long] //simplification
def close()
}
class FurnaceEventBuilder
//Now for the goodies
import scala.concurrent.Promise
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.concurrent.Await
import scalaz._
import Scalaz._
import Free._
trait Furnace[+A] {
import Furnace._
def map[B](f: A => B): Furnace[B] = furnace(rw => apply(rw) map { case (nw, a) => (nw, f(a))})
def flatMap[B](f: A => Furnace[B]): Furnace[B] = furnace(rw => apply(rw) flatMap { case (nw, a) => f(a)(nw)})
protected def apply(rw: FurnaceSession): Trampoline[(FurnaceSession, A)]
}
trait FurnaceFunctions {
def furnace[A](f: FurnaceSession => Trampoline[(FurnaceSession, A)])
= new Furnace[A] {
protected def apply(rw: FurnaceSession) = Suspend(() => f(rw))
}
}
object Furnace extends FurnaceFunctions{ module =>
def unsafePerformIO[A](f: Furnace[A]): A = {
val s = new FurnaceService {
def createSession(user: String, password: String) = new FurnaceSession {
def replay(d: Date) = Iterator.empty
def insert(b: FurnaceEventBuilder) = Promise.successful(1L).future
def close() = ()
}
}
val t = s.createSession("foo", "bar")
try f.apply(t).run._2 finally t.close()
}
def eitherT[A](f: FurnaceSession => A): EitherT[Furnace, Throwable, A] = EitherT(apply(fs => \/.fromTryCatch(f(fs))))
def apply[A](f: FurnaceSession => A): Furnace[A] = furnace(rw => return_(rw -> f(rw)))
def point[A](a: => A) = apply(_ => a)
implicit val FurnaceInstances = new Monad[Furnace] {
def point[A](a: => A) = module.point(a)
def bind[A, B](fa: Furnace[A])(f: (A) => Furnace[B]) = fa flatMap f
}
}
//Our DSL for interacting with Furnace
object dsl {
def dateQuery(d: Date): FurnaceSession => List[FurnaceEvent] = fs => {
fs.replay(d).toList
}
def insert(b: FurnaceEventBuilder): FurnaceSession => Future[Long] = fs => fs.insert(b)
implicit class FurnaceEventsW(f: Furnace[List[FurnaceEvent]]) {
def latest(excludeDeleted: Boolean = true): Furnace[Map[Long, FurnaceEvent]] = f map { list =>
list.foldLeft(Map.empty[Long, FurnaceEvent]) { case (m, ev) => m + (ev.txn -> ev)}
}
}
final class ListOfEventFunctorOps[F[_], A](self: F[A], F: Functor[F]) {
def latestF(excludeDeleted: Boolean = true)(implicit A: A =:= List[FurnaceEvent]): F[Map[Long, FurnaceEvent]] = F.map(self){ list =>
A(list).foldLeft(Map.empty[Long, FurnaceEvent]) { case (m, ev) => m + (ev.txn -> ev)}
}
}
implicit def listOfEventFunctorOps[FA](fa: FA)(implicit F: Unapply[Functor, FA]) = new ListOfEventFunctorOps[F.M, F.A](F(fa), F.TC)
}
//Futures typeclass instances
object fewchaz {
object Implicits {
class FutureInstance(implicit EC: ExecutionContext) extends Monad[Future] with Traverse[Future] {
def point[A](a: => A) = Promise.successful(a).future
override def map[A, B](fa: Future[A])(f: A => B) = fa map f
def bind[A, B](fa: Future[A])(f: A => Future[B]) = fa flatMap f
def traverseImpl[G[_], A, B](fa: Future[A])(f: A => G[B])
(implicit A: Applicative[G]): G[Future[B]]
= A.map(f(Await.result(fa, Duration.Inf)))(b => point(b))
}
object parallel {
implicit def FutureInstances(implicit EC: ExecutionContext) = new FutureInstance {
override def ap[A, B](fa: => Future[A])(fab: => Future[A => B])
= (fa zip fab) map { case (a, f) => f(a) }
}
}
object sequential {
implicit def FutureInstances(implicit EC: ExecutionContext) = new FutureInstance
}
}
final class SequencedFunctorOps[F[_], A](self: F[A], F: Functor[F]) {
def sequencedF[B, M[_] <: TraversableOnce[_]](implicit EC: ExecutionContext, A: A => M[Future[B]], cbf: CanBuildFrom[M[Future[B]], B, M[B]]): F[Future[M[B]]] = F.map(self)(a => Future.sequence(A(a)))
}
implicit def sequencedFunctorOps[FA](fa: FA)(implicit F: Unapply[Functor, FA]) = new SequencedFunctorOps[F.M, F.A](F(fa), F.TC)
}
//Example usage of DSL
class MissingFxException extends Exception
import dsl._
import fewchaz._
import scala.concurrent.ExecutionContext.Implicits.global
trait Pipelines {
type CorrectedEvents = Future[Stream[Long]]
val correctFxRates = Map.empty[java.util.Currency, Double]
def suspiciousFxRates(xs: Iterable[FurnaceEvent]) = Set.empty[java.util.Currency]
}
// Example 1 - query/update pipeline
object Pipeline1 extends Pipelines {
import fewchaz.Implicits.sequential._
def correct_(xs: Iterable[FurnaceEvent], rates: java.util.Currency => Double): Furnace[CorrectedEvents] = xs.toStream.traverseU(e => Furnace(insert(e.withFx(rates(e.ccy))))).map(_.sequenceU)
val pipeline: Furnace[CorrectedEvents] =
for {
x <- Furnace(dateQuery(new Date)).latest() //x is Map[Long, FurnaceEvent]
y <- suspiciousFxRates(x.values).point[Furnace]
z <- if (y forall correctFxRates.keySet)
correct_(x.values filter (correctFxRates.keySet contains _.ccy), correctFxRates)
else
Promise.failed(new MissingFxException).future.point[Furnace]
}
yield z
import scala.concurrent.duration._
Await.result(Furnace.unsafePerformIO(pipeline), atMost = 5.minutes)
}
// Example 2 - query/update pipeline with error handling
object Pipeline2 extends Pipelines {
import fewchaz.Implicits.sequential._
def suspiciousFxRates2(xs: Iterable[FurnaceEvent]): Throwable \/ Set[java.util.Currency] = Set.empty[java.util.Currency].right
def correct_(xs: Iterable[FurnaceEvent], rates: java.util.Currency => Double): EitherT[Furnace, Throwable, CorrectedEvents] = {
xs.toStream.traverseU(e => Furnace.eitherT(insert(e.withFx(rates(e.ccy))))).map(_.sequenceU)
}
val pipeline: EitherT[Furnace, Throwable, CorrectedEvents] =
for {
x <- Furnace.eitherT(dateQuery(new Date))
y <- EitherT.right(x.pure[Furnace].latest())
z <- EitherT(suspiciousFxRates2(y.values).pure[Furnace])
a <- if (z forall correctFxRates.keySet)
correct_(y.values filter (correctFxRates.keySet contains _.ccy), correctFxRates)
else
EitherT.left((new MissingFxException).pure[Furnace])
}
yield a
Furnace.unsafePerformIO(pipeline.run)
}
// Example 3 - query/update pipeline with error handling and functor abstraction
object Pipeline3 extends Pipelines {
import fewchaz.Implicits.sequential._
def suspiciousFxRates3(xs: Iterable[FurnaceEvent]): Throwable \/ Set[java.util.Currency] = Set.empty[java.util.Currency].right
def correct_(xs: Iterable[FurnaceEvent], rates: java.util.Currency => Double): EitherT[Furnace, Throwable, CorrectedEvents] = {
xs.toStream.traverseU(e => Furnace.eitherT(insert(e.withFx(rates(e.ccy))))).map(_.sequenceU)
}
val pipeline: EitherT[Furnace, Throwable, CorrectedEvents] =
for {
x <- Furnace.eitherT(dateQuery(new Date)).latestF()
y <- EitherT(suspiciousFxRates3(x.values).pure[Furnace])
z <- if (y forall correctFxRates.keySet)
correct_(x.values filter (correctFxRates.keySet contains _.ccy), correctFxRates)
else
EitherT.left((new MissingFxException).pure[Furnace])
}
yield z
Furnace.unsafePerformIO(pipeline.run)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment