Created
March 24, 2015 08:24
-
-
Save MishaelRosenthal/8c71b805282f47f0290f to your computer and use it in GitHub Desktop.
An exercise for learning Algebird, Monoid, Monads, ScalaCheck, etc...
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package core.sparkTest.examples | |
| import com.twitter.algebird.Monoid | |
| import scala.annotation.tailrec | |
| import scala.collection.GenSeq | |
| import scala.collection.immutable.Queue | |
| /** | |
| * Created by mishael on 3/23/15. | |
| * | |
| */ | |
| class MergeSortMonoid[T](implicit ord: Ordering[T]) extends Monoid[Queue[T]] { | |
| def build(value: T) = Queue(value) | |
| override def zero: Queue[T] = Queue() | |
| // TODO: ??? | |
| override def plus(l: Queue[T], r: Queue[T]): Queue[T] = { | |
| @tailrec | |
| def plusRec(l: Queue[T], r: Queue[T], acc: Queue[T]): Queue[T] = { | |
| if(l.isEmpty) | |
| acc.enqueue(r) | |
| else if(r.isEmpty) | |
| acc.enqueue(l) | |
| else if(ord.lteq(l.head, r.head)) { | |
| val(head, tail) = l.dequeue | |
| plusRec(tail, r, acc.enqueue(head)) | |
| } else { | |
| val(head, tail) = r.dequeue | |
| plusRec(l, tail, acc.enqueue(head)) | |
| } | |
| } | |
| plusRec(l, r, Queue()) | |
| } | |
| } | |
| object MergeSortMonoid { | |
| implicit def mergeSortMonoid[T](implicit ord: Ordering[T]) = new MergeSortMonoid[T] | |
| implicit class PimpSeq[T](val seq: GenSeq[T]) extends AnyVal { | |
| def mergeSort(implicit monoid: MergeSortMonoid[T]): Seq[T] = | |
| seq.aggregate(monoid.zero)({ case (q, value) => monoid.plus(q, monoid.build(value)) }, monoid.plus) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package core.sparkTest.examples | |
| import core.SBLogger | |
| import scala.util.Try | |
| /** | |
| * Created by mishael on 3/23/15. | |
| * | |
| */ | |
| trait Monad[M[_]] { | |
| def flatMap[T, U](as: M[T])(f: T => M[U]): M[U] | |
| def unit[T](x: T): M[T] | |
| // Only needed for testing | |
| def areEqual[T](l: M[T], r: M[T]): Boolean | |
| } | |
| object Monad extends SBLogger{ | |
| implicit class RichM[M[_], T](m: M[T]){ | |
| def fMap[U](f: T => M[U])(implicit monad: Monad[M]): M[U] = monad.flatMap(m)(f) | |
| def ~=(other: M[T])(implicit monad: Monad[M]): Boolean = { | |
| logger.info(s"l: $m, r: $other") | |
| monad.areEqual(m, other) | |
| } | |
| } | |
| implicit object ListMonad extends Monad[List] { | |
| override def flatMap[T, U](as: List[T])(f: (T) => List[U]): List[U] = as.flatMap(f) | |
| override def unit[T](x: T): List[T] = List(x) | |
| override def areEqual[T](l: List[T], r: List[T]): Boolean = l == r | |
| } | |
| implicit object TryMonad extends Monad[Try] { | |
| override def flatMap[T, U](as: Try[T])(f: (T) => Try[U]): Try[U] = as.flatMap(f) // TODO: ??? | |
| override def unit[T](x: T): Try[T] = Try(x) // TODO: ??? | |
| override def areEqual[T](l: Try[T], r: Try[T]): Boolean = l.toString == r.toString | |
| } | |
| } | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package core.sparkTest.examples | |
| import org.scalatest.FunSuite | |
| import org.scalatest.prop.Checkers | |
| import scala.util.Try | |
| /** | |
| * Created by mishael on 3/23/15. | |
| * | |
| * Links can be found in: | |
| * https://docs.google.com/a/sparkbeyond.com/document/d/1ENnCprIDbndYQUkK-blOzeLksvl8ISJAaQUkFBTBKcE/edit?usp=sharing | |
| */ | |
| class MonadThroughExampleTest extends FunSuite with Checkers{ | |
| import Monad._ | |
| def monadLaws[M[_], A, B, C](f: (A) => M[B], g: B => M[C], x: A, m: M[A])(implicit monad: Monad[M]) = { | |
| val associativity = m.fMap(f).fMap(g) ~= m.fMap(x => f(x).fMap(g)) | |
| val leftUnitLaw = monad.unit(x).fMap(f) ~= f(x) | |
| val rightUnitLaw = m.fMap(monad.unit) ~= m | |
| associativity && leftUnitLaw && rightUnitLaw | |
| } | |
| /** | |
| * Q: Fill in some f, g, x, m | |
| */ | |
| test("Monad laws for ListMonad") { | |
| val f = (x: Int) => List.fill(x)(1) // TODO: ??? | |
| val g = (x: Int) => List.fill(2)(x) // TODO: ??? | |
| val x = 5 // TODO: ??? | |
| val m = List(1, 2, 3) // TODO: ??? | |
| assert(monadLaws(f, g, x, m)) | |
| } | |
| /** | |
| * Q: Implement TryMonad. | |
| */ | |
| test("Try Monad"){ | |
| val f = (x: Int) => Try("SparkBeyond".charAt(x)) | |
| val g = (x: Char) => Try(x match { | |
| case 'a' => "a is good" | |
| case _ => throw new MatchError(s"$x is bad") | |
| }) | |
| val x = 2 | |
| val m = Try(40) | |
| assert(monadLaws(f, g, x, m)) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package core.sparkTest.examples | |
| import com.twitter.algebird.Operators._ | |
| import com.twitter.algebird._ | |
| import core.CommonImplicits._ | |
| import core.SBLogger | |
| import core.common.Util | |
| import org.apache.spark.{SparkContext, SparkConf} | |
| import org.scalacheck.{Gen, Prop} | |
| import org.scalatest.FunSuite | |
| import org.scalatest.prop.Checkers | |
| import HyperLogLog._ | |
| /** | |
| * Created by mishael on 3/23/15. | |
| * | |
| * Links can be found in: | |
| * https://docs.google.com/a/sparkbeyond.com/document/d/1ENnCprIDbndYQUkK-blOzeLksvl8ISJAaQUkFBTBKcE/edit?usp=sharing | |
| */ | |
| class MonoidsThroughExamplesTest extends FunSuite with Checkers with SBLogger{ | |
| import core.sparkTest.examples.MergeSortMonoid._ | |
| import MonoidsThroughExamplesTest._ | |
| /** | |
| * Q: What is the expected result? | |
| */ | |
| test("Max monoid on lists") { | |
| check { | |
| Prop.forAll(listGenerator(1, 1000)) { | |
| (list: List[Int]) => | |
| val monoids = list.map(Max(_)) | |
| val expected = list.max // TODO: ??? | |
| val actual = monoids.monoidSum.get | |
| logger.info(s"Expected $expected, actual $actual") | |
| expected == actual | |
| } | |
| } | |
| } | |
| /** | |
| * Q: Implement the actual. | |
| */ | |
| test("Min monoid on RDD") { | |
| val expected = rddDoubles.min() | |
| val actual = rddDoubles.map(Min(_)).reduce(_ + _).get // TODO: ??? | |
| logger.info(s"Expected $expected, actual $actual") | |
| assert(expected == actual) | |
| } | |
| /** | |
| * Q1: What is the expected result? | |
| * Q2: Find some number of bits that gives a good enough accuracy. | |
| */ | |
| test("HyperLogLog monoid on RDD"){ | |
| val expected = rddInts.distinct().count() // TODO: ??? | |
| val bits = 15 // TODO: ??? | |
| val monoid = new HyperLogLogMonoid(bits) | |
| val actual = monoid.sizeOf(rddInts.map{monoid(_)}.reduce(_ + _)).estimate | |
| logger.info(s"Expected $expected, actual $actual") | |
| assert(expected.toDouble ~= actual) | |
| } | |
| /** | |
| * Q1: Implement rddGaussians. | |
| * Q2: Implement momentsSum. | |
| * Q3: Find the expected mean and std. | |
| */ | |
| test("Moments monoid for calculating std and mean") { | |
| val momentsRdd = rddGaussians.map(Moments(_)).cache() | |
| val momentsSum = momentsRdd.reduce(_ + _) // TODO: ??? | |
| logger.info(s"mean ${momentsSum.mean}, std ${momentsSum.stddev}") | |
| val mean = 7.0 // TODO: ??? | |
| val std = 2.0 // TODO: ??? | |
| (mean ~= momentsSum.mean) && | |
| (std ~= momentsSum.stddev) | |
| } | |
| /** | |
| * We wish to implement parallel merge sort using monoids. | |
| * | |
| * Q: Implement MergeSortMonoid.plus | |
| */ | |
| test("Parallel merge sort.") { | |
| check{ | |
| Prop.forAll(listGenerator(500, 1000)) { | |
| (list: List[Int]) => | |
| val listPar = list.par | |
| val repeats = 10 | |
| val speedUp = 2 | |
| def expected = list.sorted | |
| def actual = list.mergeSort | |
| def actualPar = listPar.mergeSort | |
| val scalaTime = Util.microBenchmark(repeats){expected} | |
| val nonParallelTime = Util.microBenchmark(repeats){actual} | |
| val parallelTime = Util.microBenchmark(repeats){actualPar} | |
| logger.info( | |
| s"""Benchmarks, for seq of size ${list.size} | |
| |List: $list | |
| |Scala implementation: $scalaTime | |
| |Non parallel implementation: $nonParallelTime | |
| |Parallel implementation: $parallelTime ms | |
| """.stripMargin) | |
| nonParallelTime >= parallelTime * speedUp && | |
| expected == actual && expected == actualPar | |
| } | |
| } | |
| } | |
| } | |
| object MonoidsThroughExamplesTest{ | |
| val conf = new SparkConf() | |
| .setMaster("local[*]") | |
| .setAppName("Simple Application") | |
| implicit val sc = new SparkContext(conf) | |
| val baseDir = "data" / "datasets" / "sparkbeyondDatasets" | |
| val doublesPath = baseDir / "largeFileOfDoubles.tsv" | |
| val intPath = baseDir / "largeFileOfInts.tsv" | |
| val gaussiansPath = baseDir / "largeFileOfGaussians.tsv" | |
| lazy val rddDoubles = sc.textFile(doublesPath).map(_.toDouble).cache() | |
| lazy val rddInts = sc.textFile(intPath).map(_.toInt).cache() | |
| lazy val rddGaussians = sc.textFile(gaussiansPath).map(_.toDouble).cache() // TODO: ??? | |
| implicit class PimpDouble(val left: Double) extends AnyVal{ | |
| def ~= (right: Double): Boolean = { | |
| math.abs(left - right) <= math.min(math.abs(left), math.abs(right)) * 1e-2 | |
| } | |
| } | |
| def listGenerator(sizeLowerBound: Int, sizeUpperBound: Int, min: Int = -1000, max: Int = 1000) = | |
| for { | |
| k <- Gen.choose(sizeLowerBound, sizeUpperBound) | |
| l <- Gen.listOfN(k, Gen.choose(min, max)) | |
| } yield l | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment