Created
April 29, 2012 19:36
-
-
Save alexy/2552884 to your computer and use it in GitHub Desktop.
joinLeft differs from joinRight, both are larger than expected, ++ causes NPE
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object PairsNews { | |
type RankerPairs = DList[(Long, Iterable[(Long, Long))] | |
def main(a: Array[String]) = withHadoopArgs(a) { case args => | |
case object o extends ScallopConf(args) { | |
val date = opt[String]("date", descr = "generate pairs for new regs after that date") | |
val allRankerPairsBase = opt[String]("base", descr = "directory where all ranker-pairs are stored, per day", | |
default = Some("fatpipe/generated")) | |
val globOld = opt[Boolean]("globOld", descr = "consider old as base for dates and read from all of them") | |
val addedRankerPairsFile = opt[String]("added", descr = "compliment of the old state to get the new state",required=true) | |
val newlyRankerPairsFile = opt[String]("newly", descr = "the pairs for the really newly registered rankers") | |
val newRankerPairsFile = trailArg[String](required = true) | |
// val totalRankerPairsFile = trailArg[String](required = true) | |
val oldRankerPairsFileArg = trailArg[String](required = false) | |
verify | |
def oldRankerPairsFile() = oldRankerPairsFileArg.get match { | |
case Some(s) => s | |
case _ => FileDay(newRankerPairsFile()).previous | |
} | |
// validate: | |
override def toString = | |
"reading old pairs from " + oldRankerPairsFile() + | |
"\n new pairs from " + newRankerPairsFile() + | |
// "\n writing overall combined pairs to " + totalRankerPairsFile() + | |
(addedRankerPairsFile.get match { | |
case Some(s) => "\n writing added ranker-pairs to " + s | |
case _ => ""}) + | |
(newlyRankerPairsFile.get match { | |
case Some(s) => "\n writing newly registered ranker-pairs to " + s | |
case _ => ""}) | |
} | |
println("HelloKitty! Set-difference for Ranker-Pairs " + o) | |
println(" options set: " + o.summary) | |
val plusGlobMaybe = if (o.globOld()) "/*" else "" | |
type RankerPairSet = (KloutID, Set[Edge]) | |
val oldRankerPairs = | |
if (o.globOld()) { | |
Pipeline.readRankerPairs(o.oldRankerPairsFile() + plusGlobMaybe) | |
.map { case (r, psIt) => (r, psIt.toSet) } | |
.groupByKey | |
.combine((_:Set[Edge])++(_:Set[Edge])) | |
.map{ case (r, ps) => (r, ps.toIterable) } | |
} else Pipeline.readRankerPairs(o.oldRankerPairsFile()) | |
val newRankerPairs = Pipeline.readRankerPairs(o.newRankerPairsFile()) | |
// val rpairsAdded: RankerPairs = joinLeft(newRankerPairs,oldRankerPairs).flatMap { | |
// case (r,newPairs,(Some(oldPairs))) => | |
// val added = newPairs.toSet -- oldPairs // newPairs.diff(oldPairs.toSet) | |
// if (added.isEmpty) None | |
// else Some(r, added.toIterable) | |
// case (r,(newPairs,_)) => Some(r,newPairs) | |
// } | |
val rpairsAdded: RankerPairs = joinRight(oldRankerPairs,newRankerPairs).flatMap { | |
case (r,(Some(oldPairs),newPairs)) => | |
val added = newPairs.toSet -- oldPairs // newPairs.diff(oldPairs.toSet) | |
if (added.isEmpty) None | |
else Some(r, added.toIterable) | |
case (r,(_,newPairs)) => Some(r,newPairs) | |
} | |
val rpairsNewly = joinLeft(newRankerPairs,oldRankerPairs) collect { case (r,(curr,None)) => (r,curr) } | |
// TODO: we need both new rankers and the old ones! | |
// oldRankerPairs,newRankerPairs should give the same result as | |
// oldRankerPairs,rpairsSince | |
// val rpairsUnion = joinLeft(oldRankerPairs,rpairsSince) map { | |
// case (r,(oldPairs,Some(newPairs))) => | |
// val union = oldPairs.toSet ++ newPairs | |
// (r, union.toIterable) | |
// case (r,(oldPairs,_)) => (r,oldPairs) | |
// } | |
// | |
// val rpairsTotal = rpairsUnion ++ rpairsNewly | |
persist( | |
toTextFile(rpairsAdded map Pipeline.rankerPairsStr, o.addedRankerPairsFile()) | |
// toTextFile(rpairsNewly map Pipeline.rankerPairsStr, o.newlyRankerPairsFile()) | |
// toTextFile(rpairsTotal map Pipeline.rankerPairsStr, o.totalRankerPairsFile()) | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment