Created
June 12, 2014 16:53
-
-
Save ashwanthkumar/0b626746ddf119968556 to your computer and use it in GitHub Desktop.
User Identifier Normalization from Big Data book by Nathan Marz implementated in Scalding.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.scalding.{Tsv, Job, Args} | |
import scala.collection.immutable.TreeSet | |
/* | |
Lets assume we are reading a file of format (a,b) where a,b denote that node a and node b are connected in a graph. | |
For simplicity we will assume that a and b are ints. We want to find the mapping of all the nodes on a fixed point. | |
*/ | |
class FixedPointJob(args: Args) extends Job(args) { | |
val input = args("input") | |
val outputBaseDir = args("output-base-dir") | |
val progressSink = args("progress") | |
val iteration = args.getOrElse("iteration", "1").toInt | |
val sink = outputBaseDir + "/run-" + iteration | |
def bidirectionalEdge(tuple: (Int, Int)) = { | |
val (node1, node2) = tuple | |
Iterable((node1, node2), (node2, node1)) | |
} | |
def iterateEdges(edges: Iterator[(Int, Int)]) = { | |
val (grouped, first) = edges.next() | |
val allIds = edges.foldLeft(TreeSet(grouped, first))((soFar, elem) => soFar + elem._2) | |
val smallest = allIds.head | |
val progress = allIds.size > 2 && !grouped.equals(smallest) | |
(allIds - smallest).map(elem => (smallest, elem, progress)) | |
} | |
val iterationSoFar = { | |
Tsv(input, fields = ('n1, 'n2)) | |
.flatMapTo(('n1, 'n2) -> ('b1, 'b2))(bidirectionalEdge) | |
.groupBy('b2)(_.mapStream(('b2, 'b1) -> ('node1, 'node2, 'isNew))(iterateEdges)) | |
.project(('node1, 'node2, 'isNew)) | |
} | |
iterationSoFar | |
.filter('isNew)(identity[Boolean]) | |
.write(Tsv(progressSink)) | |
iterationSoFar | |
.distinct(('node1, 'node2)) | |
.write(Tsv(sink)) | |
override def next: Option[Job] = { | |
val nextIteration = iteration + 1 | |
val nextArgs = args + ("input", Some(sink)) + | |
("output", Some(outputBaseDir + "/run-" + nextIteration)) + | |
("iteration", Some(nextIteration.toString)) | |
if(!Tsv(progressSink).readAtSubmitter[(Int, Int)].isEmpty) { | |
Some(clone(nextArgs)) | |
} else { | |
None | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.scalatest.FunSuite | |
import com.twitter.scalding.{FieldConversions, TupleConversions, Tsv, JobTest} | |
import scala.collection.mutable | |
import org.scalatest.matchers.ShouldMatchers | |
class FixedPointJobTest extends FunSuite with TupleConversions with FieldConversions with ShouldMatchers { | |
test("should work for the example in the book") { | |
val exampleGraphInput = List( | |
(1,4), (4,3), (4,5), (5,2), (5,11) | |
) | |
val graphAfterIteration1 = List( | |
(1,4), (1,3), (1,5), (3,4), (2,4), (2,5), (2, 11), (5, 11) | |
) | |
val graphAfterIteration2 = List( | |
(1,3), (1,4), (1,5), (1,11), (1,2), (2,5), (2, 4), (2,11) | |
) | |
val finalExpectedGraph = List( | |
(1,2), (1,3), (1,4), (1,5), (1,11) | |
) | |
def validateOutput(runString: String, expectedGraph: List[(Int, Int)])(buffer: mutable.Buffer[(Int, Int)]) { | |
println(runString) | |
buffer.filterNot(expectedGraph.contains).size should be(0) | |
} | |
JobTest(new FixedPointJob(_)) | |
.arg("input", "input") | |
.arg("output-base-dir", "output-base-dir") | |
.arg("progress", "progress") | |
.arg("iteration", "1") | |
.source(Tsv("input", fields = ('n1, 'n2)), exampleGraphInput) | |
.source(Tsv("output-base-dir/run-1", fields = ('n1, 'n2)), graphAfterIteration1) | |
.source(Tsv("output-base-dir/run-2", fields = ('n1, 'n2)), graphAfterIteration2) | |
.source(Tsv("output-base-dir/run-3", fields = ('n1, 'n2)), finalExpectedGraph) | |
.source(Tsv("output-base-dir/run-4", fields = ('n1, 'n2)), Iterable()) | |
.sink(Tsv("output-base-dir/run-1"))(validateOutput("Validating Run 1", graphAfterIteration1)) | |
.sink(Tsv("output-base-dir/run-2"))(validateOutput("Validating Run 2", graphAfterIteration2)) | |
.sink(Tsv("output-base-dir/run-3"))(validateOutput("Validating Run 3", finalExpectedGraph)) | |
.sink(Tsv("output-base-dir/run-4"))(validateOutput("Validating Run 4", finalExpectedGraph)) | |
.sink(Tsv("progress"))(doNothing) | |
.run | |
.finish | |
} | |
def doNothing(buffer: mutable.Buffer[(Int, Int)]) {} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Reference - Page #167 on Big Data MEAP v18 (PDF) or Chapter 6.6 on ePub.