Skip to content

Instantly share code, notes, and snippets.

@ponkin
Created August 18, 2015 20:24
Show Gist options
  • Save ponkin/12762746afcef1fcf3b1 to your computer and use it in GitHub Desktop.
Save ponkin/12762746afcef1fcf3b1 to your computer and use it in GitHub Desktop.
// Recursive joins
def stepOver(prevStep: RDD[(String, String)], iteration: Int = 1): RDD[(String, String)] = {
val currStep = index.cogroup(prevStep.map( _.swap )).flatMapValues(pair =>
for (i <- pair._1.iterator; ps <- pair._2.iterator)
yield (ps, i) // ps - initial vertex, i - next vertex in path
).setName( s"""Step_$iteration""").persist()
val count = currStep.count()
if (count == 0 || iteration == 25) currStep
else currStep union stepOver(currStep, iteration + 1)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment