Created
July 21, 2016 11:19
-
-
Save aristotle0x01/de820afcabf43e68139ce79ca099691f to your computer and use it in GitHub Desktop.
simhash duplicates detection 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// scalastyle:off println | |
import duplicate.SimHash | |
import org.apache.commons.lang3.StringUtils | |
import org.apache.log4j.{Level, Logger} | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.storage.StorageLevel | |
import org.apache.spark.{SparkConf, SparkContext} | |
import scopt.OptionParser | |
import scala.collection.mutable.ArrayBuffer | |
import scala.util.parsing.json._ | |
object JoinDuplicate { | |
val HAMMING_DISTANCE = 7 | |
def main(args: Array[String]) { | |
val defaultParams = Params() | |
val parser = new OptionParser[Params]("Duplicates") { | |
head("Duplicates: find duplicates in code snippets.") | |
opt[String]("input") | |
.text("input file path") | |
.action((x, c) => c.copy(input = x)) | |
opt[String]("output") | |
.text("output file path") | |
.action((x, c) => c.copy(output = x)) | |
} | |
parser.parse(args, defaultParams).map { params => | |
run(params) | |
}.getOrElse { | |
parser.showUsageAsError | |
sys.exit(1) | |
} | |
} | |
def parseLong(s:String): java.lang.Long ={ | |
try { | |
return java.lang.Long.parseLong(s.trim) | |
} catch { | |
case e: Exception => 0L | |
} | |
} | |
private def run(params: Params) { | |
val conf = new SparkConf().setAppName(s"Duplicates with $params").set("spark.driver.maxResultSize", "32g") | |
val sc = new SparkContext(conf) | |
Logger.getRootLogger.setLevel(Level.WARN) | |
val preprocessStart = System.nanoTime() | |
val textRDD = sc.textFile(params.input) | |
// lines | |
val id_hash = textRDD. | |
map { case (text) => StringUtils.split(text, 0x01.toChar)}. | |
filter{case (array) => array.length == 2}. | |
filter{case (array) => !array.head.trim.isEmpty && !array.last.isEmpty}. | |
map { case (array) => (array.head.trim, parseLong(array.last))}. | |
filter{case (id, hash) => hash != 0L} | |
// test | |
id_hash.top(50).foreach(println(_)) | |
val linesCount = id_hash.count() | |
println() | |
println(s"Corpus summary:") | |
println(s"\t lines of corpus is: $linesCount") | |
println() | |
// group ids that have the same hash byte-wise | |
val groups = new ArrayBuffer[RDD[(String, scala.Iterable[String])]]() | |
for( index <- 0 to 7){ | |
val g = id_hash. | |
map{case (id, hash) => (SimHash.extractSub(hash,8,index*8), id)}. | |
groupByKey. | |
map{case (l, list) => list}. | |
map{case (list)=>{ | |
val d = new ArrayBuffer[(String, Iterable[String])]() | |
for(f <- list){ | |
d.append((f, list)) | |
} | |
d | |
}}. | |
flatMap(y => y) | |
groups += g | |
} | |
val zgroups = groups. | |
zipWithIndex. | |
map{case (map, index) => (index, map)}. | |
toMap | |
// suspects | |
val suspects1 = (zgroups.get(0).get join zgroups.get(1).get). | |
map { | |
case (id, pair) => { | |
val set = new ArrayBuffer[String] | |
set.appendAll(pair._1) | |
set.appendAll(pair._2) | |
(id, set) | |
} | |
} | |
val suspects2 = (zgroups.get(2).get join zgroups.get(3).get). | |
map { | |
case (id, pair) => { | |
val set = new ArrayBuffer[String] | |
set.appendAll(pair._1) | |
set.appendAll(pair._2) | |
(id, set) | |
} | |
} | |
val suspects3 = (zgroups.get(4).get join zgroups.get(5).get). | |
map { | |
case (id, pair) => { | |
val set = new ArrayBuffer[String] | |
set.appendAll(pair._1) | |
set.appendAll(pair._2) | |
(id, set) | |
} | |
} | |
val suspects4 = (zgroups.get(6).get join zgroups.get(7).get). | |
map { | |
case (id, pair) => { | |
val set = new ArrayBuffer[String] | |
set.appendAll(pair._1) | |
set.appendAll(pair._2) | |
(id, set) | |
} | |
} | |
val suspects5 = (suspects1 join suspects2). | |
map { | |
case (id, pair) => { | |
val set = new ArrayBuffer[String] | |
set.appendAll(pair._1) | |
set.appendAll(pair._2) | |
(id, set) | |
} | |
} | |
val suspects6 = (suspects3 join suspects4). | |
map { | |
case (id, pair) => { | |
val set = new ArrayBuffer[String] | |
set.appendAll(pair._1) | |
set.appendAll(pair._2) | |
(id, set) | |
} | |
} | |
val suspects = (suspects5 join suspects6). | |
map { | |
case (id, pair) => { | |
val set = new ArrayBuffer[String] | |
set.appendAll(pair._1) | |
set.appendAll(pair._2) | |
(id, set) | |
} | |
} | |
suspects.persist(StorageLevel.MEMORY_AND_DISK) | |
// id hash map | |
val hash_map = id_hash.collect.toMap | |
val results = suspects. | |
map{ | |
case (id, ids) => { | |
var list = new ArrayBuffer[String] | |
val uniques = ids.toList.distinct | |
val hash = hash_map.get(id).get | |
for(m <- uniques){ | |
if(!id.equalsIgnoreCase(m) && SimHash.hammingDistance(hash.longValue,hash_map.get(m).get.longValue) <= HAMMING_DISTANCE){ | |
list += m | |
} | |
} | |
(id, list) | |
} | |
}. | |
filter{case (id,list) => list.length > 0} | |
// duplicate ids concatenated by comma in a single line | |
val duplicates = results. | |
map{ case (id,list) => id + "," +list.mkString(",")} | |
duplicates.coalesce(1, true).saveAsTextFile(params.output) | |
println() | |
println("Duplicates finished!") | |
println() | |
val preprocessElapsed = (System.nanoTime() - preprocessStart) / 1e9 | |
println(s"\t Elapsed time is: $preprocessElapsed") | |
sc.stop | |
} | |
private case class Params( | |
input: String = "", | |
output: String = "") extends AbstractParams[Params] | |
} | |
// scalastyle:on println |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment