Last active
August 29, 2015 14:26
-
-
Save lancearlaus/b43b7acb8a3aada51701 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env scalas | |
// NOTE: This is a self-encapsulated Scala script meant to be run with scalas | |
// See http://www.scala-sbt.org/0.13/docs/Scripts.html | |
/*** | |
scalaVersion := "2.11.6" | |
resolvers += Resolver.url("typesafe-ivy-repo", url("http://typesafe.artifactoryonline.com/typesafe/releases"))(Resolver.ivyStylePatterns) | |
libraryDependencies ++= Seq( | |
"com.typesafe.scala-logging" %% "scala-logging" % "3.+", | |
"com.typesafe.akka" % "akka-stream-experimental_2.11" % "1.0", | |
"org.scalatest" %% "scalatest" % "2.2.4" | |
) | |
*/ | |
import com.typesafe.scalalogging.StrictLogging | |
import akka.actor.ActorSystem | |
import akka.stream._ | |
import akka.stream.stage._ | |
import akka.stream.scaladsl.FlowGraph.Implicits._ | |
import akka.stream.scaladsl._ | |
import org.scalatest._ | |
import org.scalatest.concurrent.ScalaFutures | |
import org.scalatest.prop.TableDrivenPropertyChecks._ | |
import org.scalactic.Tolerance._ | |
import org.scalactic.TripleEqualsSupport.Spread | |
import scala.collection.immutable._ | |
import scala.util.Random | |
// Creates an unbounded source of random ints with a known seed (for repeatability) | |
def randomSource(seed: Int) = Source(() => { | |
val random = new Random(seed) | |
Iterator.continually(random.nextInt) | |
}) | |
// Transform a source of integers into a normalized source of doubles where | |
// each element emitted is in the range of 0 to 1 | |
// Note that the incoming source must be both finite and support multiple subscribers | |
def normalize(in: Source[Int, Unit]): Source[Double, Unit] = { | |
// Fold over the input source to create a new source that emits a single element | |
// which is the range of integers over the entire stream | |
val fold = in.fold((Int.MaxValue, Int.MinValue)) { | |
(range, n) => range match { | |
case (l, u) => (l.min(n), u.max(n)) | |
} | |
} | |
// Transform the single element range source into an unbounded source | |
// that continually emits the same element | |
val range = fold.map(r => Source.repeat(r)).flatten(FlattenStrategy.concat) | |
// Create a stage that normalizes each value | |
val normalize = Flow[(Int, (Int, Int))].map { | |
case (n, (min, max)) if (min == max) => 1.0 | |
case (n, (min, max)) => (n.toDouble - min.toDouble) / (max.toDouble - min.toDouble) | |
} | |
// Create the final source using a flow that combines the prior constructs | |
Source(in, range, Zip[Int, (Int, Int)], normalize)((mat, _, _, _) => mat) { | |
implicit b => (in, range, zip, normalize) => | |
in ~> zip.in0 | |
range ~> zip.in1 | |
zip.out ~> normalize | |
normalize.outlet | |
} | |
} | |
// Create/destroy an actor system for testing | |
trait AkkaStreamsImplicits extends BeforeAndAfterAll { this: Suite => | |
implicit var system: ActorSystem = _ | |
implicit var materializer: Materializer = _ | |
override def beforeAll = { | |
super.beforeAll | |
system = ActorSystem(this.getClass.getSimpleName.replace("$", "_")) | |
materializer = ActorMaterializer() | |
} | |
override def afterAll = { | |
system.shutdown | |
super.afterAll | |
} | |
} | |
class NormalizeSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures { | |
val seed = 42 | |
"Normalize" should "properly calculate for constant stream" in { | |
val value = 5 | |
val size = 100 | |
val expected = Seq.fill(size)(1.0) | |
val constants = Source.repeat(value).take(size) | |
val normalized = normalize(constants) | |
val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _)) | |
whenReady(future) { result => | |
//println(s"result: $result") | |
result should have size expected.size | |
result.zip(expected).foreach { case (actual, expected) => | |
actual shouldBe expected | |
} | |
} | |
} | |
it should "properly calculate for random stream" in { | |
val size = 100 | |
val randoms = randomSource(seed).take(size) | |
val normalized = normalize(randoms) | |
val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _)) | |
whenReady(future) { result => | |
//println(s"result: $result") | |
result should have size size | |
result should contain (0.0) | |
result should contain (1.0) | |
result.exists(_ < 0.0) shouldBe false | |
result.exists(_ > 1.0) shouldBe false | |
} | |
} | |
} | |
// Run the test case | |
run(new NormalizeSpec) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment