Skip to content

Instantly share code, notes, and snippets.

@treper
Last active December 20, 2015 09:59
Show Gist options
  • Save treper/6112578 to your computer and use it in GitHub Desktop.
Save treper/6112578 to your computer and use it in GitHub Desktop.
calculate a word co-occurrence matrix,scala json parser is not thread safe,use json4s
import scala.util.parsing.json._
import org.json4s._
import org.json4s.native.JsonMethods._
import scala.collection.mutable.ArrayBuffer
def parseLine(line:String):ArrayBuffer[String]={
val jsonstr = line.split("\t")(1)
val a=JSON.parseFull(jsonstr)
val result=ArrayBuffer[String]()
if(a!=None){
val itemIdfArray = ArrayBuffer[Pair[Int,Int]]()
val ls = a.get.asInstanceOf[Map[String, Any]].get("list").get.asInstanceOf[List[Any]]
ls.foreach(l => {val t = l.asInstanceOf[List[Any]];val p:Pair[Int,Int]=Pair(t(0).asInstanceOf[Double].toInt,t(1).asInstanceOf[Double].toInt);itemIdfArray+=p;})
//find cooccurrency
if(itemIdfArray.length>1){
for(i <- 0 until itemIdfArray.length-1){
for(j <- 1 until itemIdfArray.length){
val one = itemIdfArray(i)
val two = itemIdfArray(j)
if(one._2<two._2){
result+=one._1.toString+"-"+two._1.toString
}
else if(one._2>two._2){
result+=two._1.toString+"-"+one._1.toString
}
}
}
}
}
result
}
val file=sc.textFile("/home/mps/md5_labels.txt")
val ff=file.take(10000)
val comatrix=sc.parallelize(ff).flatMap(parseLine).map(item=>(item,1)).reduceByKey((a,b)=>(a+b))
comatrix.saveAsTextFile("/home/mps/comatrix")
val comatrix=file.flatMap(parseLine).map(item=>(item,1)).reduceByKey((a,b)=>(a+b))
comatrix.saveAsTextFile("/home/mps/comatrix")
//System.setProperty("spark.cores.max", "24")
//val pairs=file.flatMap(parseLine).cache()
//val comatrix=pairs.map(item=>(item,1)).reduceByKey((a,b)=>(a+b))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment