Skip to content

Instantly share code, notes, and snippets.

@johnynek
Created November 1, 2012 20:35
Show Gist options
  • Save johnynek/3996317 to your computer and use it in GitHub Desktop.
Save johnynek/3996317 to your computer and use it in GitHub Desktop.
Second followers in scalding
ackage com.twitter.ads.batch.experimental.oscar
import com.twitter.scalding._
import com.twitter.pluck.job._
import com.twitter.pluck.source._
import com.twitter.pluck.source.matrix._
import com.twitter.pluck.mathematics._
import com.twitter.scalding.mathematics.Monoid
/*
* Use hyperloglog to propagate two steps up the graph to keep at each node
* the number of people that can be reached in 2 or less steps (retweet distance)
*/
class RetweetReach(args : Args) extends TwitterDateJob(args) {
implicit val hll = new HyperLogLogMonoid(10) //gives about 3% error, 4K bytes at each record
import HyperLogLog.long2Bytes
/*
* this function takes a row vector v_i
* and returns v_j = \sum_{G_{ij} == true} v_i
* G_ij = true when i follows j
*/
val propagator = propagate[Long,HLLInstance](CurrentFollowerByFollowed().binarizeAs[Boolean]) _
// Create the hll instance for each uid, just uses those that have active followers
val activeFollowers = mapR(ActiveFollowerCount().transpose) { (uid,cnt) =>
// Only consider the votes of users with at least one active follower
if(cnt > 0)
Some((uid, hll(uid)))
else
None
}
//Step twice
val steps = 2
val prop = Vector.fill(steps)(propagator).reduce { _ andThen _ }
val propagated = prop(activeFollowers)
// Now actually estimate the size for each uid
mapR(propagated) { (uid, inst) => Some((uid, hll.estimateSize(inst))) }
// Take the 500 biggest:
.topElems(500)
.write(Tsv(args("output")))
// This is a general matrix propagation, should be in Matrix.scala
def propagate[Idx,T:Monoid](indicator : Matrix[Idx,Idx,Boolean])(vec : RowVector[Idx,T]) :
RowVector[Idx, T] = {
val tzero = implicitly[Monoid[T]].zero
// Make the column a row, and line it up with the each row of the matrix:
indicator.zip(vec.transpose)
// This is a matrix product where the value on the left is 0/1
// so we are inducing enough of a ring to do the multiplication
.mapValues { boolT => if(boolT._1) boolT._2 else tzero }
.sumRowVectors
}
// TODO: this is janky and should clearly be in Matrix.scala, wait for merge with open source
def mapR[Col,V,C,W](col : RowVector[Col,V])(fn : (Col,V) => Option[(C,W)]) : RowVector[C,W] = {
val newPipe = col.pipe.flatMap(col.fields -> col.fields) { colV : (Col,V) =>
fn(colV._1, colV._2)
}
val rowS = Symbol(Dsl.asList(col.fields)(0).toString)
val valS = Symbol(Dsl.asList(col.fields)(1).toString)
new RowVector[C,W](rowS, valS, newPipe, col.sizeH)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment