Created
December 1, 2012 22:17
-
-
Save johnynek/4185596 to your computer and use it in GitHub Desktop.
Sketch of scala graph algorithm API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// run with: scala Graph.scala | |
// Should print a bunch of crap. | |
// Take away: | |
// Simple API can easily express: PageRank, ConnectedComponents, Cosine/Jaccard similarity | |
// See examples below: | |
import scala.annotation.tailrec | |
import scala.collection.parallel.immutable.ParIterable | |
abstract class Graph[G<:Graph[_,N],N](m: Map[N,(Set[N],Set[N])]) { | |
// Abstract methods: | |
def +(n: N): G | |
def +(fromTo: (N,N)): G | |
def reverse: G | |
def size: Int = m.size | |
def nodes: Iterator[N] = m.iterator.map { _._1 } | |
def nodesPar: ParIterable[N] =m.par.map { _._1 } | |
def contains(n: N): Boolean = m.contains(n) | |
def outEdges(n: N): Set[N] = m.get(n).map { _._1 }.getOrElse(Set[N]()) | |
def outDegree(n: N): Int = outEdges(n).size | |
def inEdges(n: N): Set[N] = m.get(n).map { _._2 }.getOrElse(Set[N]()) | |
def inDegree(n: N): Int = inEdges(n).size | |
override def toString = | |
m.flatMap { ns => | |
ns._2._1.map { n2 => ns._1.toString + " -> " + n2.toString } | |
}.mkString("\n") | |
} | |
class GraphWithState[N,S] private (m: Map[N,(Set[N],Set[N])], s: Map[N,S]) | |
extends Graph[GraphWithState[N,S], N](m) { | |
override def +(n: N) = { | |
if(m.contains(n)) this | |
else new GraphWithState[N,S](m + (n -> (Set[N](), Set[N]())), s) | |
} | |
override def +(fromTo: (N,N)) = { | |
val (src, dst) = fromTo | |
val newM = (m + (src -> ((outEdges(src) + dst), inEdges(src)))) + | |
(dst -> (outEdges(dst), (inEdges(dst) + src))) | |
new GraphWithState(newM, s) | |
} | |
override def reverse = new GraphWithState(m.map { ne => ne._1 -> (ne._2._2, ne._2._1)},s) | |
def state(n: N): S = s(n) | |
def setState(ns: (N,S)): GraphWithState[N,S] = | |
new GraphWithState(m, s + ns) | |
def setState[S2](st: (N) => S2): GraphWithState[N,S2] = | |
new GraphWithState(m, nodes.map { n => (n, st(n)) }.toMap) | |
override def toString = | |
m.flatMap { ns => | |
ns._2._1.map { n2 => | |
ns._1.toString + " -> " + n2.toString | |
}.iterator ++ Iterator("(" + ns._1 + ")=" + state(ns._1)) | |
}.mkString("\n") | |
} | |
object GraphWithState { | |
implicit def fromEdgeList[N,Unit](m: Iterable[(N,N)]): GraphWithState[N,Unit] = { | |
m.foldLeft(empty[N,Unit]) { (g, pair) => g + pair } | |
} | |
def empty[N,S] = | |
new GraphWithState[N,S](Map.empty[N,(Set[N],Set[N])], Map.empty[N,S]) | |
def algoStep[N,S](graph: GraphWithState[N,S]) | |
(fn: (N, S, Iterator[(N,S)], Iterator[(N,S)]) => Option[S]): | |
(Boolean, GraphWithState[N,S]) = { | |
/* This is a clearly a map-reduce calculation, so we could easily | |
* write a general scalding job that represents this same approach | |
*/ | |
graph.nodesPar.map { node => | |
(node, fn(node, | |
graph.state(node), | |
graph.inEdges(node).iterator.map { n => (n, graph.state(n)) }, | |
graph.outEdges(node).iterator.map { n => (n, graph.state(n)) })) | |
}.foldLeft((false, graph)) { (result, nodeOpt) => | |
val (node, newStateO) = nodeOpt | |
newStateO.map { news => (true, result._2.setState(node -> news)) } | |
.getOrElse(result) | |
} | |
} | |
@tailrec | |
def algo[N,S](graph: GraphWithState[N,S]) | |
(fn: (N, S, Iterator[(N,S)], Iterator[(N,S)]) => Option[S]): GraphWithState[N,S] = { | |
val (changed, newG) = algoStep(graph)(fn) | |
if (changed) algo(newG)(fn) else newG | |
} | |
////////////////////////////////// | |
// Here are some actual implementation of algorithms: | |
////////////////////////////////// | |
def pageRank[N](graph: GraphWithState[N,_]): GraphWithState[N,(Double,Int)] = { | |
val jumpProb = 0.1 | |
algo(graph.setState { n => (1.0, graph.outDegree(n)) }) { | |
(node, prevMass, in, _) => | |
val newMass = in.map { case (n,s) => s._1 / s._2 }.sum * (1.0 - jumpProb) + jumpProb | |
if (scala.math.abs(prevMass._1 - newMass) < 0.0001) None | |
else Some((newMass, prevMass._2)) | |
} | |
} | |
def components[N:Ordering](graph: GraphWithState[N,_]): GraphWithState[N,N] = { | |
val ord = implicitly[Ordering[N]] | |
algo(graph.setState { n => n }) { | |
(node, minNode, ins, outs) => | |
Some((ins++outs).foldLeft(minNode) { (oldMin, ns) => ord.min(oldMin, ns._2) }) | |
.filter { _ != minNode } | |
} | |
} | |
def cosineOut(graph: GraphWithState[Int,_]): GraphWithState[Int,CosineStateMachine] = { | |
algo[Int,CosineStateMachine](graph.setState { _ => InitState }) { (node, ns, ins, outs) => | |
ns.next(node, ins, outs) | |
} | |
} | |
} | |
// cosine(i,j) = <i,j>/sqrt(<i,i><j,j>) | |
sealed abstract class CosineStateMachine { | |
def next(id: Int, in: Iterator[(Int,CosineStateMachine)], | |
out: Iterator[(Int,CosineStateMachine)]) : Option[CosineStateMachine] | |
} | |
object InitState extends CosineStateMachine { | |
override def next(id: Int, in: Iterator[(Int,CosineStateMachine)], | |
out: Iterator[(Int,CosineStateMachine)]) = | |
Some(InitialState(out.size)) | |
} | |
case class InitialState(outdegree: Int) extends CosineStateMachine { | |
// read all your in-neighbors (id, out-degree) keep a list. | |
override def next(id: Int, in: Iterator[(Int,CosineStateMachine)], | |
out: Iterator[(Int,CosineStateMachine)]) = { | |
// Make sure the states are all initial: | |
val insIter = in.map { case (id, csm) => | |
csm match { | |
case is@InitialState(_) => (id, is) | |
case _ => sys.error("Invalid state") | |
} | |
}.toList | |
Some(Step1State(this, insIter)) | |
} | |
} | |
/* | |
* This is the heavy lift step | |
*/ | |
case class Step1State(initState: InitialState, insInit: Iterable[(Int, InitialState)]) | |
extends CosineStateMachine { | |
// out-degrees state now holds the list of people that point to them: | |
override def next(id: Int, in: Iterator[(Int,CosineStateMachine)], | |
out: Iterator[(Int,CosineStateMachine)]) = { | |
// Each out-degree that holds one overlap between us and all their in-degrees: | |
val result = out.flatMap { idCsm => idCsm._2 match { | |
case Step1State(_, insInit) => { | |
insInit.map { case (id, InitialState(outdeg)) => (id, outdeg) } | |
} | |
case _ => sys.error("Invalid state after step1") | |
} | |
} | |
.toList | |
.groupBy { _._1 } | |
.mapValues { iddegs => | |
// Keep the degree, should all be the same, but add up the number of | |
(iddegs.head._2, iddegs.size) | |
} | |
Some(Step2State(initState, result)) | |
} | |
} | |
case class Step2State(initState: InitialState, m: Map[Int, (Int, Int)]) extends | |
CosineStateMachine { | |
// Last step: | |
override def next(id: Int, in: Iterator[(Int,CosineStateMachine)], | |
out: Iterator[(Int,CosineStateMachine)]) = None | |
def cosine(otherId: Int): Double = { | |
m.get(otherId).map { case (degree, overlap) => | |
overlap.toDouble / (scala.math.sqrt(degree) * scala.math.sqrt(initState.outdegree)) | |
}.getOrElse(0.0) | |
} | |
} | |
import GraphWithState._ | |
val g = (empty[Int,Double] + (1 -> 2) + (2 -> 3) + (3 -> 1) + (2 -> 1)).setState(_ => 0.0) | |
println("===========") | |
println(g) | |
println("===========") | |
println(g.reverse) | |
println("===========") | |
println(pageRank(g)) | |
println("===========") | |
println(components(List(1 -> 2, 3 -> 2, 4 -> 0))) | |
println("===========") | |
val cs = cosineOut(List(1 -> 2, 3 -> 2, 1 -> 4)) | |
println(cs) | |
println(cs.state(1).asInstanceOf[Step2State].cosine(3)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment