Created
November 1, 2012 20:35
-
-
Save johnynek/3996317 to your computer and use it in GitHub Desktop.
Second followers in scalding
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ackage com.twitter.ads.batch.experimental.oscar | |
import com.twitter.scalding._ | |
import com.twitter.pluck.job._ | |
import com.twitter.pluck.source._ | |
import com.twitter.pluck.source.matrix._ | |
import com.twitter.pluck.mathematics._ | |
import com.twitter.scalding.mathematics.Monoid | |
/* | |
* Use hyperloglog to propagate two steps up the graph to keep at each node | |
* the number of people that can be reached in 2 or less steps (retweet distance) | |
*/ | |
class RetweetReach(args : Args) extends TwitterDateJob(args) { | |
implicit val hll = new HyperLogLogMonoid(10) //gives about 3% error, 4K bytes at each record | |
import HyperLogLog.long2Bytes | |
/* | |
* this function takes a row vector v_i | |
* and returns v_j = \sum_{G_{ij} == true} v_i | |
* G_ij = true when i follows j | |
*/ | |
val propagator = propagate[Long,HLLInstance](CurrentFollowerByFollowed().binarizeAs[Boolean]) _ | |
// Create the hll instance for each uid, just uses those that have active followers | |
val activeFollowers = mapR(ActiveFollowerCount().transpose) { (uid,cnt) => | |
// Only consider the votes of users with at least one active follower | |
if(cnt > 0) | |
Some((uid, hll(uid))) | |
else | |
None | |
} | |
//Step twice | |
val steps = 2 | |
val prop = Vector.fill(steps)(propagator).reduce { _ andThen _ } | |
val propagated = prop(activeFollowers) | |
// Now actually estimate the size for each uid | |
mapR(propagated) { (uid, inst) => Some((uid, hll.estimateSize(inst))) } | |
// Take the 500 biggest: | |
.topElems(500) | |
.write(Tsv(args("output"))) | |
// This is a general matrix propagation, should be in Matrix.scala | |
def propagate[Idx,T:Monoid](indicator : Matrix[Idx,Idx,Boolean])(vec : RowVector[Idx,T]) : | |
RowVector[Idx, T] = { | |
val tzero = implicitly[Monoid[T]].zero | |
// Make the column a row, and line it up with the each row of the matrix: | |
indicator.zip(vec.transpose) | |
// This is a matrix product where the value on the left is 0/1 | |
// so we are inducing enough of a ring to do the multiplication | |
.mapValues { boolT => if(boolT._1) boolT._2 else tzero } | |
.sumRowVectors | |
} | |
// TODO: this is janky and should clearly be in Matrix.scala, wait for merge with open source | |
def mapR[Col,V,C,W](col : RowVector[Col,V])(fn : (Col,V) => Option[(C,W)]) : RowVector[C,W] = { | |
val newPipe = col.pipe.flatMap(col.fields -> col.fields) { colV : (Col,V) => | |
fn(colV._1, colV._2) | |
} | |
val rowS = Symbol(Dsl.asList(col.fields)(0).toString) | |
val valS = Symbol(Dsl.asList(col.fields)(1).toString) | |
new RowVector[C,W](rowS, valS, newPipe, col.sizeH) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment