Last active
December 21, 2015 14:18
-
-
Save treper/24edce737e01438c7d36 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
DBSCAN(D, eps, MinPts) | |
C = 0 | |
for each unvisited point P in dataset D | |
mark P as visited | |
NeighborPts = regionQuery(P, eps) | |
if sizeof(NeighborPts) < MinPts | |
mark P as NOISE | |
else | |
C = next cluster | |
expandCluster(P, NeighborPts, C, eps, MinPts) | |
expandCluster(P, NeighborPts, C, eps, MinPts) | |
add P to cluster C | |
for each point P' in NeighborPts | |
if P' is not visited | |
mark P' as visited | |
NeighborPts' = regionQuery(P', eps) | |
if sizeof(NeighborPts') >= MinPts | |
NeighborPts = NeighborPts joined with NeighborPts' | |
if P' is not yet member of any cluster | |
add P' to cluster C | |
regionQuery(P, eps) | |
return all points within P's eps-neighborhood (including P) | |
**/ | |
import spark.SparkContext | |
import spark.SparkContext._ | |
import org.apache.log4j.Logger | |
import org.apache.log4j.Level | |
import java.util.concurrent.ConcurrentHashMap | |
import collection.mutable.HashMap | |
import collection.mutable.HashSet | |
var c=0 | |
var visited =new java.util.concurrent.ConcurrentHashMap[Long,Boolean] | |
var noise =new java.util.concurrent.ConcurrentHashMap[Long,Boolean] | |
var clustered =new java.util.concurrent.ConcurrentHashMap[Long,Boolean] | |
var clusters=new HashMap[Int,HashSet[Long]] | |
val file=sc.textFile("/home/mps/coover100_jaccardsimilarity") | |
val jaccardSim=file.map(line =>{val tag1=line.split("\t")(0).toLong;val tag2=line.split("\t")(1).toLong;if(tag1<tag2) (tag1+"\t"+tag2,line.split("\t")(2).toFloat);else (tag2+"\t"+tag1,line.split("\t")(2).toFloat);}).collect().toMap | |
val file =sc.textFile("/home/mps/tag_count.txt") | |
val D=file.map(line => line.split("\t")(0).toLong).collect() | |
def regionQuery(D:Array[Long],P:Long,eps:Float):HashSet[Long]={ | |
var nbs = new HashSet[Long] | |
D.filter(m =>{ | |
var sim=0.0; | |
if(P<m){ | |
if(jaccardSim.contains(P+"\t"+m)) | |
sim = jaccardSim(P+"\t"+m); | |
} | |
else{ | |
if(jaccardSim.contains(m+"\t"+P)) | |
sim = jaccardSim(m+"\t"+P); | |
} | |
sim>eps | |
}).foreach(m => nbs+=m) | |
nbs | |
} | |
def expandCluster(D:Array[Long],P:Long,NeighborPts:HashSet[Long],C:Integer,eps:Float,MinPts:Integer)={ | |
if(!clusters.contains(c)){ | |
clusters(c)=new HashSet[Long] | |
} | |
clusters(c)+=P | |
var j=0 | |
var nbl=NeighborPts.toList | |
while(j<nbl.size){ | |
//println("NeighborPts size:"+" "+NeighborPts.size+" "+nbl.size) | |
var m = nbl(j) | |
if(!visited.containsKey(m)){ | |
visited.put(m,true) | |
var NeighborPts2=regionQuery(D,m,eps) | |
if(NeighborPts2.size>=MinPts){ | |
NeighborPts2.filter(!NeighborPts.contains(_)).foreach(n => nbl++=List[Long](n)) | |
NeighborPts++=NeighborPts2 | |
//println("NeighborPts size should change") | |
} | |
} | |
if(!clustered.containsKey(m)){ | |
clustered.put(m,true) | |
clusters(c)+=m | |
} | |
j+=1 | |
} | |
//println("iterate count:"+NeighborPts.size+" "+nbl.size+" j:"+j) | |
//println(visited.size+" "+clustered.size+" "+clusters.size) | |
} | |
def DBSCAN(D:Array[Long],eps:Float,MinPts:Integer)={ | |
val unvisited=D.filter(!visited.containsKey(_)) | |
unvisited.foreach(m => { | |
visited.put(m,true) | |
val NeighborPts=regionQuery(D,m,eps) | |
if(NeighborPts.size<MinPts){ | |
noise.put(m,true) | |
} | |
else{ | |
c+=1 | |
println("new cluster "+c+" "+m) | |
expandCluster(D,m,NeighborPts,c,eps,MinPts) | |
} | |
}) | |
} | |
DBSCAN(D,0.03f,40) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment