Skip to content

Instantly share code, notes, and snippets.

@felipehummel
Created April 10, 2013 14:48
Show Gist options
  • Save felipehummel/5355285 to your computer and use it in GitHub Desktop.
Save felipehummel/5355285 to your computer and use it in GitHub Desktop.
Steps Graph
package com.busk.stepsgraph
import akka.dispatch.{ ExecutionContext, Promise, Future, Await }
import java.util.concurrent.{ Executors, ExecutorService}
import akka.util.duration._
import collection.mutable.HashMap
object StepsGraph {
def step[R, U](stepname: String, depStep: Step[_, U])(f: U => R): Step[U, R] = {
Step[U, R](stepname, Seq(depStep), f)
}
def step[R, U, T](stepname: String, depStep: Step[_, U], depStep2: Step[_, T])
(f: (U, T) => R): Step[(U, T), R] = {
val tupledFunc = f.tupled
Step[(U, T), R](stepname, Seq(depStep, depStep2), tupledFunc)
}
def step[R, U, T, V](stepname: String, depStep: Step[_, U], depStep2: Step[_, T], depStep3: Step[_, V])
(f: (U, T, V) => R): Step[(U, T, V), R] = {
val tupledFunc = f.tupled
Step[(U, T, V), R](stepname, Seq(depStep, depStep2, depStep3), tupledFunc)
}
def step[R, U, T, V, X](stepname: String, depStep: Step[_, U], depStep2: Step[_, T],
depStep3: Step[_, V], depStep4: Step[_, X])
(f: (U, T, V, X) => R): Step[(U, T, V, X), R] = {
val tupledFunc = f.tupled
Step[(U, T, V, X), R](stepname, Seq(depStep, depStep2, depStep3, depStep4), tupledFunc)
}
def step[R, U, T, V, X, Y](stepname: String, depStep: Step[_, U], depStep2: Step[_, T],
depStep3: Step[_, V], depStep4: Step[_, X], depStep5: Step[_, Y])
(f: (U, T, V, X, Y) => R): Step[(U, T, V, X, Y), R] = {
val tupledFunc = f.tupled
Step[(U, T, V, X, Y), R](stepname, Seq(depStep, depStep2, depStep3, depStep4, depStep5), tupledFunc)
}
def step[R, U, T, V, X, Y, Z](stepname: String, depStep: Step[_, U], depStep2: Step[_, T],
depStep3: Step[_, V], depStep4: Step[_, X], depStep5: Step[_, Y], depStep6: Step[_, Z])
(f: (U, T, V, X, Y, Z) => R): Step[(U, T, V, X, Y, Z), R] = {
val tupledFunc = f.tupled
Step[(U, T, V, X, Y, Z), R](stepname, Seq(depStep, depStep2, depStep3, depStep4, depStep5, depStep6), tupledFunc)
}
def graphParam[T] = new GraphParameter[T]
private[stepsgraph] def seqToTuple(seq: Seq[Any]): Any = seq match {
case Seq(a) => a
case Seq(a, b) => (a, b)
case Seq(a, b, c) => (a, b, c)
case Seq(a, b, c, d) => (a, b, c, d)
case Seq(a, b, c, d, e) => (a, b, c, d, e)
case Seq(a, b, c, d, e, f) => (a, b, c, d, e, f)
case Seq(a, b, c, d, e, f, g) => (a, b, c, d, e, f, g)
case _ => throw new IllegalArgumentException("Empty or too long Seq as parameter for seqToTuple: "+seq)
}
}
case class Step[I, R](stepName: String, dependsOn: Seq[Step[_,_]], f: I => R) {
def apply(arg: I) = f(arg)
override def toString = "Step("+stepName+")"
override def hashCode = stepName.hashCode
override def equals(other: Any) = {
other match {
case e: Step[_, _] => e.stepName == stepName && e.dependsOn == dependsOn
case _ => false
}
}
}
class GraphParameter[I] extends Step[I, I]("PARAMETER", dependsOn = Seq.empty, (arg: I) => arg)
class StepsGraph[I](val param: GraphParameter[I], val steps: Seq[Step[_,_]])
trait CompiledStepsGraph[I] {
def apply(arg: I): StepsResult[I]
}
trait StepsResult[I] {
def param: I
def apply[I, O](step: Step[I, O]): O
}
// http://architects.dzone.com/articles/algorithm-week-topological
// http://stackoverflow.com/questions/11291926/topological-sorting-of-lists-related-to-a-graph-in-scala
class SequentialSteps[I](graph: StepsGraph[I]) extends CompiledStepsGraph[I] {
def steps = graph.steps
def apply(arg: I): StepsResult[I] = {
val results = collection.mutable.HashMap[Step[_,_], Any]()
results += (graph.param -> arg)
steps foreach { step =>
executeStep(arg, step, results)
}
new StepsResult[I] {
def param = arg
def apply[R, S](step: Step[R, S]): S = {
results(step).asInstanceOf[S]
}
}
}
protected def executeStep[T, R](arg: I, step: Step[T, R], results: HashMap[Step[_,_], Any]): R = {
if (!results.contains(step)) {
step.dependsOn.filterNot ( results.contains ) foreach { s =>
executeStep(arg, s, results)
}
val resultsDependsOn: Seq[Any] = step.dependsOn map results
val x = StepsGraph.seqToTuple(resultsDependsOn)
val result = step(x.asInstanceOf[T])
results += (step -> result)
result
}
else
results(step).asInstanceOf[R]
}
}
class LazySteps[I](graph: StepsGraph[I]) extends SequentialSteps[I](graph) {
override def apply(arg: I): StepsResult[I] = {
new StepsResult[I] {
def param = arg
def apply[R, S](step: Step[R, S]): S = {
val r = collection.mutable.HashMap[Step[_,_], Any]()
r += (graph.param -> arg)
executeStep(arg, step, r)
r(step).asInstanceOf[S]
}
}
}
}
object TFIDF {
def countTerms(terms: Array[String]) =
terms.groupBy(identity).map { case (term, occurs) => (term, occurs.size) } toSeq
def getCounts(terms: Array[String]) = countTerms(terms).toMap
def getIdfs(terms: Array[String]) = Map.empty[String,Float].withDefaultValue(1F)
def tfidf(idfMap: Map[String, Float], countMap: Map[String, Int]) =
countMap map { case (term, count) => ( term -> count * idfMap(term) ) }
}
object Steps {
def main(args: Array[String]): Unit = {
import TFIDF._
import StepsGraph._
val param = graphParam[String]
val splitted = step("split", param) { txt => txt.split(" ") }
val countMap = step("countMap", splitted) { terms => getCounts(terms) }
val idfMap = step("idfMap", splitted) { terms => getIdfs(terms) }
val tfidfMap = step("tfidfs", idfMap, countMap) { (idfs, counts) =>
tfidf(idfs, counts)
}
val graph = new StepsGraph(param, Seq(splitted, countMap, idfMap, tfidfMap))
val steps = new SequentialSteps(graph)
val results = steps("hi hello how are you how bla bla ok")
println(results(countMap))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment