Skip to content

Instantly share code, notes, and snippets.

@lancearlaus
Last active August 29, 2015 14:21
Show Gist options
  • Save lancearlaus/9aa6314ce6405e42c419 to your computer and use it in GitHub Desktop.
Save lancearlaus/9aa6314ce6405e42c419 to your computer and use it in GitHub Desktop.
Blog code snippets
#!/usr/bin/env scalas
// NOTE: This is a self-encapsulated Scala script meant to be run with scalas
// See http://www.scala-sbt.org/0.13/docs/Scripts.html
/***
scalaVersion := "2.11.6"
resolvers += Resolver.url("typesafe-ivy-repo", url("http://typesafe.artifactoryonline.com/typesafe/releases"))(Resolver.ivyStylePatterns)
// Common dependency versions
val akkaVersion = "2.3.10"
val akkaHttpVersion = "1.0-RC2"
libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.+",
"com.typesafe.akka" % "akka-stream-experimental_2.11" % "1.0-RC3",
"org.scalatest" %% "scalatest" % "2.2.4"
)
*/
import com.typesafe.scalalogging.StrictLogging
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.stage._
import akka.stream.scaladsl.FlowGraph.Implicits._
import akka.stream.scaladsl._
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalactic.Tolerance._
import org.scalactic.TripleEqualsSupport.Spread
import scala.collection.immutable._
/**
* Implements a sliding window on a stream of elements.
* Follows the same semantics as IterableLike.sliding().
*/
case class Sliding[T](size: Int) extends StatefulStage[T, Iterator[T]] {
require(size > 0, "window size must be positive")
private var seq = Seq[T]()
override def initial = new StageState[T, Iterator[T]] {
override def onPush(elem: T, ctx: Context[Iterator[T]]): SyncDirective = {
if (seq.size == size) seq = seq.tail
seq = seq :+ elem
if (seq.size == size) ctx.push(seq.iterator) else ctx.pull()
}
}
override def onUpstreamFinish(ctx: Context[Iterator[T]]): TerminationDirective = {
// Push the incomplete window if stream didn't contain enough elements
if (seq.size < size) terminationEmit(Seq(seq.iterator).iterator, ctx)
else super.onUpstreamFinish(ctx)
}
}
// Model quotes as a simple map
type Quote = Map[String, String]
def movingAverage(window: Int = 3) = Flow() { implicit b =>
val in = b.add(Broadcast[Quote](2))
val extract = b.add(Flow[Quote].map(_("Close").toDouble))
val sliding = b.add(Flow[Double].transform(() => new Sliding(window)))
val average = b.add(Flow[Iterator[Double]].map(_.foldLeft(0.0)(_ + _)).map(_ / window))
val zip = b.add(Zip[Quote, Double])
val merge = b.add(Flow[(Quote, Double)].map { case (quote, sma) => quote + ("SMA" -> f"$sma%1.2f") })
in ~> zip.in0
in ~> extract ~> sliding ~> average ~> zip.in1
zip.out ~> merge
(in.in, merge.outlet)
}
// Create/destroy an actor system for testing
trait AkkaStreamsImplicits extends BeforeAndAfterAll { this: Suite =>
implicit var system: ActorSystem = _
implicit var materializer: FlowMaterializer = _
override def beforeAll = {
super.beforeAll
system = ActorSystem(this.getClass.getSimpleName.replace("$", "_"))
materializer = ActorFlowMaterializer()
}
override def afterAll = {
system.shutdown
super.afterAll
}
}
class MovingAverageSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures {
val tests = Table(
("window", "quotes", "expected"),
(
3,
Seq(
Map("Close" -> "10.00"),
Map("Close" -> "9.00"),
Map("Close" -> "10.00"),
Map("Close" -> "8.00"),
Map("Close" -> "12.00")
),
Seq(
Map("Close" -> "10.00", "SMA" -> "9.67"),
Map("Close" -> "9.00", "SMA" -> "9.00"),
Map("Close" -> "10.00", "SMA" -> "10.00")
)
)
)
"Moving average" should "be properly calculated for all scenarios" in {
forAll(tests) { (window: Int, quotes: Seq[Quote], expected: Seq[Quote]) =>
val future = Source(quotes).via(movingAverage(window)).runWith(Sink.fold(List[Quote]())(_ :+ _))
whenReady(future) { result =>
result should have size expected.size
result.zip(expected).foreach { case (actual, expected) =>
actual shouldBe expected
}
}
}
}
}
// Run the test case
run(new MovingAverageSpec)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment