Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save aristotle0x01/de820afcabf43e68139ce79ca099691f to your computer and use it in GitHub Desktop.
Save aristotle0x01/de820afcabf43e68139ce79ca099691f to your computer and use it in GitHub Desktop.
simhash duplicates detection 2
// scalastyle:off println
import duplicate.SimHash
import org.apache.commons.lang3.StringUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scopt.OptionParser
import scala.collection.mutable.ArrayBuffer
import scala.util.parsing.json._
object JoinDuplicate {
val HAMMING_DISTANCE = 7
def main(args: Array[String]) {
val defaultParams = Params()
val parser = new OptionParser[Params]("Duplicates") {
head("Duplicates: find duplicates in code snippets.")
opt[String]("input")
.text("input file path")
.action((x, c) => c.copy(input = x))
opt[String]("output")
.text("output file path")
.action((x, c) => c.copy(output = x))
}
parser.parse(args, defaultParams).map { params =>
run(params)
}.getOrElse {
parser.showUsageAsError
sys.exit(1)
}
}
def parseLong(s:String): java.lang.Long ={
try {
return java.lang.Long.parseLong(s.trim)
} catch {
case e: Exception => 0L
}
}
private def run(params: Params) {
val conf = new SparkConf().setAppName(s"Duplicates with $params").set("spark.driver.maxResultSize", "32g")
val sc = new SparkContext(conf)
Logger.getRootLogger.setLevel(Level.WARN)
val preprocessStart = System.nanoTime()
val textRDD = sc.textFile(params.input)
// lines
val id_hash = textRDD.
map { case (text) => StringUtils.split(text, 0x01.toChar)}.
filter{case (array) => array.length == 2}.
filter{case (array) => !array.head.trim.isEmpty && !array.last.isEmpty}.
map { case (array) => (array.head.trim, parseLong(array.last))}.
filter{case (id, hash) => hash != 0L}
// test
id_hash.top(50).foreach(println(_))
val linesCount = id_hash.count()
println()
println(s"Corpus summary:")
println(s"\t lines of corpus is: $linesCount")
println()
// group ids that have the same hash byte-wise
val groups = new ArrayBuffer[RDD[(String, scala.Iterable[String])]]()
for( index <- 0 to 7){
val g = id_hash.
map{case (id, hash) => (SimHash.extractSub(hash,8,index*8), id)}.
groupByKey.
map{case (l, list) => list}.
map{case (list)=>{
val d = new ArrayBuffer[(String, Iterable[String])]()
for(f <- list){
d.append((f, list))
}
d
}}.
flatMap(y => y)
groups += g
}
val zgroups = groups.
zipWithIndex.
map{case (map, index) => (index, map)}.
toMap
// suspects
val suspects1 = (zgroups.get(0).get join zgroups.get(1).get).
map {
case (id, pair) => {
val set = new ArrayBuffer[String]
set.appendAll(pair._1)
set.appendAll(pair._2)
(id, set)
}
}
val suspects2 = (zgroups.get(2).get join zgroups.get(3).get).
map {
case (id, pair) => {
val set = new ArrayBuffer[String]
set.appendAll(pair._1)
set.appendAll(pair._2)
(id, set)
}
}
val suspects3 = (zgroups.get(4).get join zgroups.get(5).get).
map {
case (id, pair) => {
val set = new ArrayBuffer[String]
set.appendAll(pair._1)
set.appendAll(pair._2)
(id, set)
}
}
val suspects4 = (zgroups.get(6).get join zgroups.get(7).get).
map {
case (id, pair) => {
val set = new ArrayBuffer[String]
set.appendAll(pair._1)
set.appendAll(pair._2)
(id, set)
}
}
val suspects5 = (suspects1 join suspects2).
map {
case (id, pair) => {
val set = new ArrayBuffer[String]
set.appendAll(pair._1)
set.appendAll(pair._2)
(id, set)
}
}
val suspects6 = (suspects3 join suspects4).
map {
case (id, pair) => {
val set = new ArrayBuffer[String]
set.appendAll(pair._1)
set.appendAll(pair._2)
(id, set)
}
}
val suspects = (suspects5 join suspects6).
map {
case (id, pair) => {
val set = new ArrayBuffer[String]
set.appendAll(pair._1)
set.appendAll(pair._2)
(id, set)
}
}
suspects.persist(StorageLevel.MEMORY_AND_DISK)
// id hash map
val hash_map = id_hash.collect.toMap
val results = suspects.
map{
case (id, ids) => {
var list = new ArrayBuffer[String]
val uniques = ids.toList.distinct
val hash = hash_map.get(id).get
for(m <- uniques){
if(!id.equalsIgnoreCase(m) && SimHash.hammingDistance(hash.longValue,hash_map.get(m).get.longValue) <= HAMMING_DISTANCE){
list += m
}
}
(id, list)
}
}.
filter{case (id,list) => list.length > 0}
// duplicate ids concatenated by comma in a single line
val duplicates = results.
map{ case (id,list) => id + "," +list.mkString(",")}
duplicates.coalesce(1, true).saveAsTextFile(params.output)
println()
println("Duplicates finished!")
println()
val preprocessElapsed = (System.nanoTime() - preprocessStart) / 1e9
println(s"\t Elapsed time is: $preprocessElapsed")
sc.stop
}
private case class Params(
input: String = "",
output: String = "") extends AbstractParams[Params]
}
// scalastyle:on println
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment