Skip to content

Instantly share code, notes, and snippets.

@MishaelRosenthal
Created March 24, 2015 08:24
Show Gist options
  • Select an option

  • Save MishaelRosenthal/8c71b805282f47f0290f to your computer and use it in GitHub Desktop.

Select an option

Save MishaelRosenthal/8c71b805282f47f0290f to your computer and use it in GitHub Desktop.
An exercise for learning Algebird, Monoid, Monads, ScalaCheck, etc...
package core.sparkTest.examples
import com.twitter.algebird.Monoid
import scala.annotation.tailrec
import scala.collection.GenSeq
import scala.collection.immutable.Queue
/**
* Created by mishael on 3/23/15.
*
*/
class MergeSortMonoid[T](implicit ord: Ordering[T]) extends Monoid[Queue[T]] {
def build(value: T) = Queue(value)
override def zero: Queue[T] = Queue()
// TODO: ???
override def plus(l: Queue[T], r: Queue[T]): Queue[T] = {
@tailrec
def plusRec(l: Queue[T], r: Queue[T], acc: Queue[T]): Queue[T] = {
if(l.isEmpty)
acc.enqueue(r)
else if(r.isEmpty)
acc.enqueue(l)
else if(ord.lteq(l.head, r.head)) {
val(head, tail) = l.dequeue
plusRec(tail, r, acc.enqueue(head))
} else {
val(head, tail) = r.dequeue
plusRec(l, tail, acc.enqueue(head))
}
}
plusRec(l, r, Queue())
}
}
object MergeSortMonoid {
implicit def mergeSortMonoid[T](implicit ord: Ordering[T]) = new MergeSortMonoid[T]
implicit class PimpSeq[T](val seq: GenSeq[T]) extends AnyVal {
def mergeSort(implicit monoid: MergeSortMonoid[T]): Seq[T] =
seq.aggregate(monoid.zero)({ case (q, value) => monoid.plus(q, monoid.build(value)) }, monoid.plus)
}
}
package core.sparkTest.examples
import core.SBLogger
import scala.util.Try
/**
* Created by mishael on 3/23/15.
*
*/
trait Monad[M[_]] {
def flatMap[T, U](as: M[T])(f: T => M[U]): M[U]
def unit[T](x: T): M[T]
// Only needed for testing
def areEqual[T](l: M[T], r: M[T]): Boolean
}
object Monad extends SBLogger{
implicit class RichM[M[_], T](m: M[T]){
def fMap[U](f: T => M[U])(implicit monad: Monad[M]): M[U] = monad.flatMap(m)(f)
def ~=(other: M[T])(implicit monad: Monad[M]): Boolean = {
logger.info(s"l: $m, r: $other")
monad.areEqual(m, other)
}
}
implicit object ListMonad extends Monad[List] {
override def flatMap[T, U](as: List[T])(f: (T) => List[U]): List[U] = as.flatMap(f)
override def unit[T](x: T): List[T] = List(x)
override def areEqual[T](l: List[T], r: List[T]): Boolean = l == r
}
implicit object TryMonad extends Monad[Try] {
override def flatMap[T, U](as: Try[T])(f: (T) => Try[U]): Try[U] = as.flatMap(f) // TODO: ???
override def unit[T](x: T): Try[T] = Try(x) // TODO: ???
override def areEqual[T](l: Try[T], r: Try[T]): Boolean = l.toString == r.toString
}
}
package core.sparkTest.examples
import org.scalatest.FunSuite
import org.scalatest.prop.Checkers
import scala.util.Try
/**
* Created by mishael on 3/23/15.
*
* Links can be found in:
* https://docs.google.com/a/sparkbeyond.com/document/d/1ENnCprIDbndYQUkK-blOzeLksvl8ISJAaQUkFBTBKcE/edit?usp=sharing
*/
class MonadThroughExampleTest extends FunSuite with Checkers{
import Monad._
def monadLaws[M[_], A, B, C](f: (A) => M[B], g: B => M[C], x: A, m: M[A])(implicit monad: Monad[M]) = {
val associativity = m.fMap(f).fMap(g) ~= m.fMap(x => f(x).fMap(g))
val leftUnitLaw = monad.unit(x).fMap(f) ~= f(x)
val rightUnitLaw = m.fMap(monad.unit) ~= m
associativity && leftUnitLaw && rightUnitLaw
}
/**
* Q: Fill in some f, g, x, m
*/
test("Monad laws for ListMonad") {
val f = (x: Int) => List.fill(x)(1) // TODO: ???
val g = (x: Int) => List.fill(2)(x) // TODO: ???
val x = 5 // TODO: ???
val m = List(1, 2, 3) // TODO: ???
assert(monadLaws(f, g, x, m))
}
/**
* Q: Implement TryMonad.
*/
test("Try Monad"){
val f = (x: Int) => Try("SparkBeyond".charAt(x))
val g = (x: Char) => Try(x match {
case 'a' => "a is good"
case _ => throw new MatchError(s"$x is bad")
})
val x = 2
val m = Try(40)
assert(monadLaws(f, g, x, m))
}
}
package core.sparkTest.examples
import com.twitter.algebird.Operators._
import com.twitter.algebird._
import core.CommonImplicits._
import core.SBLogger
import core.common.Util
import org.apache.spark.{SparkContext, SparkConf}
import org.scalacheck.{Gen, Prop}
import org.scalatest.FunSuite
import org.scalatest.prop.Checkers
import HyperLogLog._
/**
* Created by mishael on 3/23/15.
*
* Links can be found in:
* https://docs.google.com/a/sparkbeyond.com/document/d/1ENnCprIDbndYQUkK-blOzeLksvl8ISJAaQUkFBTBKcE/edit?usp=sharing
*/
class MonoidsThroughExamplesTest extends FunSuite with Checkers with SBLogger{
import core.sparkTest.examples.MergeSortMonoid._
import MonoidsThroughExamplesTest._
/**
* Q: What is the expected result?
*/
test("Max monoid on lists") {
check {
Prop.forAll(listGenerator(1, 1000)) {
(list: List[Int]) =>
val monoids = list.map(Max(_))
val expected = list.max // TODO: ???
val actual = monoids.monoidSum.get
logger.info(s"Expected $expected, actual $actual")
expected == actual
}
}
}
/**
* Q: Implement the actual.
*/
test("Min monoid on RDD") {
val expected = rddDoubles.min()
val actual = rddDoubles.map(Min(_)).reduce(_ + _).get // TODO: ???
logger.info(s"Expected $expected, actual $actual")
assert(expected == actual)
}
/**
* Q1: What is the expected result?
* Q2: Find some number of bits that gives a good enough accuracy.
*/
test("HyperLogLog monoid on RDD"){
val expected = rddInts.distinct().count() // TODO: ???
val bits = 15 // TODO: ???
val monoid = new HyperLogLogMonoid(bits)
val actual = monoid.sizeOf(rddInts.map{monoid(_)}.reduce(_ + _)).estimate
logger.info(s"Expected $expected, actual $actual")
assert(expected.toDouble ~= actual)
}
/**
* Q1: Implement rddGaussians.
* Q2: Implement momentsSum.
* Q3: Find the expected mean and std.
*/
test("Moments monoid for calculating std and mean") {
val momentsRdd = rddGaussians.map(Moments(_)).cache()
val momentsSum = momentsRdd.reduce(_ + _) // TODO: ???
logger.info(s"mean ${momentsSum.mean}, std ${momentsSum.stddev}")
val mean = 7.0 // TODO: ???
val std = 2.0 // TODO: ???
(mean ~= momentsSum.mean) &&
(std ~= momentsSum.stddev)
}
/**
* We wish to implement parallel merge sort using monoids.
*
* Q: Implement MergeSortMonoid.plus
*/
test("Parallel merge sort.") {
check{
Prop.forAll(listGenerator(500, 1000)) {
(list: List[Int]) =>
val listPar = list.par
val repeats = 10
val speedUp = 2
def expected = list.sorted
def actual = list.mergeSort
def actualPar = listPar.mergeSort
val scalaTime = Util.microBenchmark(repeats){expected}
val nonParallelTime = Util.microBenchmark(repeats){actual}
val parallelTime = Util.microBenchmark(repeats){actualPar}
logger.info(
s"""Benchmarks, for seq of size ${list.size}
|List: $list
|Scala implementation: $scalaTime
|Non parallel implementation: $nonParallelTime
|Parallel implementation: $parallelTime ms
""".stripMargin)
nonParallelTime >= parallelTime * speedUp &&
expected == actual && expected == actualPar
}
}
}
}
object MonoidsThroughExamplesTest{
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Simple Application")
implicit val sc = new SparkContext(conf)
val baseDir = "data" / "datasets" / "sparkbeyondDatasets"
val doublesPath = baseDir / "largeFileOfDoubles.tsv"
val intPath = baseDir / "largeFileOfInts.tsv"
val gaussiansPath = baseDir / "largeFileOfGaussians.tsv"
lazy val rddDoubles = sc.textFile(doublesPath).map(_.toDouble).cache()
lazy val rddInts = sc.textFile(intPath).map(_.toInt).cache()
lazy val rddGaussians = sc.textFile(gaussiansPath).map(_.toDouble).cache() // TODO: ???
implicit class PimpDouble(val left: Double) extends AnyVal{
def ~= (right: Double): Boolean = {
math.abs(left - right) <= math.min(math.abs(left), math.abs(right)) * 1e-2
}
}
def listGenerator(sizeLowerBound: Int, sizeUpperBound: Int, min: Int = -1000, max: Int = 1000) =
for {
k <- Gen.choose(sizeLowerBound, sizeUpperBound)
l <- Gen.listOfN(k, Gen.choose(min, max))
} yield l
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment