Skip to content

Instantly share code, notes, and snippets.

@treper
Last active December 21, 2015 14:18
Show Gist options
  • Save treper/24edce737e01438c7d36 to your computer and use it in GitHub Desktop.
Save treper/24edce737e01438c7d36 to your computer and use it in GitHub Desktop.
/**
DBSCAN(D, eps, MinPts)
C = 0
for each unvisited point P in dataset D
mark P as visited
NeighborPts = regionQuery(P, eps)
if sizeof(NeighborPts) < MinPts
mark P as NOISE
else
C = next cluster
expandCluster(P, NeighborPts, C, eps, MinPts)
expandCluster(P, NeighborPts, C, eps, MinPts)
add P to cluster C
for each point P' in NeighborPts
if P' is not visited
mark P' as visited
NeighborPts' = regionQuery(P', eps)
if sizeof(NeighborPts') >= MinPts
NeighborPts = NeighborPts joined with NeighborPts'
if P' is not yet member of any cluster
add P' to cluster C
regionQuery(P, eps)
return all points within P's eps-neighborhood (including P)
**/
import spark.SparkContext
import spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import java.util.concurrent.ConcurrentHashMap
import collection.mutable.HashMap
import collection.mutable.HashSet
var c=0
var visited =new java.util.concurrent.ConcurrentHashMap[Long,Boolean]
var noise =new java.util.concurrent.ConcurrentHashMap[Long,Boolean]
var clustered =new java.util.concurrent.ConcurrentHashMap[Long,Boolean]
var clusters=new HashMap[Int,HashSet[Long]]
val file=sc.textFile("/home/mps/coover100_jaccardsimilarity")
val jaccardSim=file.map(line =>{val tag1=line.split("\t")(0).toLong;val tag2=line.split("\t")(1).toLong;if(tag1<tag2) (tag1+"\t"+tag2,line.split("\t")(2).toFloat);else (tag2+"\t"+tag1,line.split("\t")(2).toFloat);}).collect().toMap
val file =sc.textFile("/home/mps/tag_count.txt")
val D=file.map(line => line.split("\t")(0).toLong).collect()
def regionQuery(D:Array[Long],P:Long,eps:Float):HashSet[Long]={
var nbs = new HashSet[Long]
D.filter(m =>{
var sim=0.0;
if(P<m){
if(jaccardSim.contains(P+"\t"+m))
sim = jaccardSim(P+"\t"+m);
}
else{
if(jaccardSim.contains(m+"\t"+P))
sim = jaccardSim(m+"\t"+P);
}
sim>eps
}).foreach(m => nbs+=m)
nbs
}
def expandCluster(D:Array[Long],P:Long,NeighborPts:HashSet[Long],C:Integer,eps:Float,MinPts:Integer)={
if(!clusters.contains(c)){
clusters(c)=new HashSet[Long]
}
clusters(c)+=P
var j=0
var nbl=NeighborPts.toList
while(j<nbl.size){
//println("NeighborPts size:"+" "+NeighborPts.size+" "+nbl.size)
var m = nbl(j)
if(!visited.containsKey(m)){
visited.put(m,true)
var NeighborPts2=regionQuery(D,m,eps)
if(NeighborPts2.size>=MinPts){
NeighborPts2.filter(!NeighborPts.contains(_)).foreach(n => nbl++=List[Long](n))
NeighborPts++=NeighborPts2
//println("NeighborPts size should change")
}
}
if(!clustered.containsKey(m)){
clustered.put(m,true)
clusters(c)+=m
}
j+=1
}
//println("iterate count:"+NeighborPts.size+" "+nbl.size+" j:"+j)
//println(visited.size+" "+clustered.size+" "+clusters.size)
}
def DBSCAN(D:Array[Long],eps:Float,MinPts:Integer)={
val unvisited=D.filter(!visited.containsKey(_))
unvisited.foreach(m => {
visited.put(m,true)
val NeighborPts=regionQuery(D,m,eps)
if(NeighborPts.size<MinPts){
noise.put(m,true)
}
else{
c+=1
println("new cluster "+c+" "+m)
expandCluster(D,m,NeighborPts,c,eps,MinPts)
}
})
}
DBSCAN(D,0.03f,40)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment