Second followers in scalding
import com.twitter.scalding._
import com.twitter.pluck.job._
import com.twitter.pluck.source._
import com.twitter.pluck.source.matrix._
import com.twitter.pluck.mathematics._
import com.twitter.scalding.mathematics.Monoid
* Use hyperloglog to propagate two steps up the graph to keep at each node
* the number of people that can be reached in 2 or less steps (retweet distance)
class RetweetReach(args : Args) extends TwitterDateJob(args) {
implicit val hll = new HyperLogLogMonoid(10) //gives about 3% error, 4K bytes at each record
import HyperLogLog.long2Bytes
* this function takes a row vector v_i
* and returns v_j = \sum_{G_{ij} == true} v_i
* G_ij = true when i follows j
val propagator = propagate[Long,HLLInstance](CurrentFollowerByFollowed().binarizeAs[Boolean]) _
// Create the hll instance for each uid, just uses those that have active followers
val activeFollowers = mapR(ActiveFollowerCount().transpose) { (uid,cnt) =>
// Only consider the votes of users with at least one active follower
if(cnt > 0)
Some((uid, hll(uid)))
//Step twice
val steps = 2
val prop = Vector.fill(steps)(propagator).reduce { _ andThen _ }
val propagated = prop(activeFollowers)
// Now actually estimate the size for each uid
mapR(propagated) { (uid, inst) => Some((uid, hll.estimateSize(inst))) }
// Take the 500 biggest:
// This is a general matrix propagation, should be in Matrix.scala
def propagate[Idx,T:Monoid](indicator : Matrix[Idx,Idx,Boolean])(vec : RowVector[Idx,T]) :
RowVector[Idx, T] = {
val tzero = implicitly[Monoid[T]].zero
// Make the column a row, and line it up with the each row of the matrix:
// This is a matrix product where the value on the left is 0/1
// so we are inducing enough of a ring to do the multiplication
.mapValues { boolT => if(boolT._1) boolT._2 else tzero }
// TODO: this is janky and should clearly be in Matrix.scala, wait for merge with open source
def mapR[Col,V,C,W](col : RowVector[Col,V])(fn : (Col,V) => Option[(C,W)]) : RowVector[C,W] = {
val newPipe = col.pipe.flatMap(col.fields -> col.fields) { colV : (Col,V) =>
fn(colV._1, colV._2)
val rowS = Symbol(Dsl.asList(col.fields)(0).toString)
val valS = Symbol(Dsl.asList(col.fields)(1).toString)
new RowVector[C,W](rowS, valS, newPipe, col.sizeH)
