Skip to content

Instantly share code, notes, and snippets.

@johnynek
Created February 19, 2013 23:23
Show Gist options
  • Save johnynek/4991205 to your computer and use it in GitHub Desktop.
Save johnynek/4991205 to your computer and use it in GitHub Desktop.
Example of computing click-rate in map/reduce style.
#!/bin/sh
exec scala -savecompiled "$0" "$@"
!#
// Toy Map Reduce framework:
class MapReduce[T,K,V,R](flatMapFn: (T) => Iterable[(K,V)], reduceFn: ((K,Iterable[V])) => R) {
def apply(input: Iterable[T]): Map[K,R] = {
// Apply the flatMap function:
val mapped: Iterable[(K,V)] = input.flatMap(flatMapFn)
// do the shuffle (in distributed systems, sets of keys are handled by different workers
val shuffled: Map[K,Iterable[(K,V)]] = mapped.groupBy { _._1 }
// Just keep the V values in the second part of the Map (to clean up the function we pass in)
val values: Map[K, (K,Iterable[V])] = shuffled.map { case (k,kvs) => (k, (k, kvs.map { _._2 })) }
// apply reduce:
values.mapValues(reduceFn)
}
}
def generatePageNumClick(size: Int): Iterable[String] = {
import java.util.Random
val seed = 1
val pages = 100
val rng = new Random(seed)
(0 to size).map { idx =>
// generate a random
val page = rng.nextInt(pages)
val clicked = rng.nextBoolean
"%s %s".format(page, clicked)
}
}
// Map onto (Key, Value)
def mapFunction(line: String): Iterable[(String,Int)] =
line.split("""\s+""") match {
case Array(page, clicked) =>
val clickInt = if(java.lang.Boolean.valueOf(clicked)) 1 else 0
Iterable((page, clickInt))
case _ => Iterable.empty
}
// Compute the average of a list of integers as a stream:
def reduceFunction(grouped: (String, Iterable[Int])): (Int, Double) = {
val clickData = grouped._2
// Compute the count and the ave at the same time:
val initCountAve = (0, 0.0)
clickData.foldLeft(initCountAve) { (cntAve, clicked) =>
val (count, ave) = cntAve
val newCount = count + 1
// Can you think of a transformation of this to avoid doing ave * count?
val newAve = (ave * count + clicked.toDouble)/newCount
(newCount, newAve)
}
}
// Pass these functions to our job:
val myWordCountJob = new MapReduce(mapFunction _, reduceFunction _)
// Run the job:
val input: Iterable[String] = generatePageNumClick(1000)
val result = myWordCountJob.apply(input)
//print it out:
result
.toList
// Sort by highest click-rate
.sortBy { kv => -kv._2._2 }
.foreach { println(_) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment