Skip to content

Instantly share code, notes, and snippets.

Created December 20, 2016 21:19
Show Gist options
  • Save thiagomoretto/e8fc68ef7c2b819155719041450e1d1e to your computer and use it in GitHub Desktop.
Save thiagomoretto/e8fc68ef7c2b819155719041450e1d1e to your computer and use it in GitHub Desktop.
Optics clustering algorithm
from math import sqrt
from geopy.distance import great_circle
from collections import defaultdict
def km(a, b):
return great_circle(a, b).km
def euc2d(a, b):
return sqrt((a[0] - b[0])**2 + (a[1] - b[1])** 2)
class Point(object):
def __init__(self, x, y):
self.x = x
self.y = y
self.coords = (x, y)
def __getitem__(self, index):
return self.coords[index]
def __repr__(self):
return "{},{}".format(self[0], self[1])
class Neighborhood(object):
def __init__(self, points, eps, distance):
self.points = points
self.eps = eps
self.graph = defaultdict(list)
self.nearest = {}
def _build(self, distance):
for pi in self.points:
min_dist = float('inf')
for pj in self.points:
if pi == pj:
dist = distance(pi, pj)
if dist <= self.eps:
if dist < min_dist:
min_dist = dist
self.nearest[pi] = min_dist
def of(self, p):
return self.graph[p]
def optics(points, eps, min_pts, distance=km):
import itertools
from heapq import heappop, heappush
counter = itertools.count()
REMOVED = '<removed>' # Removed point placeholder.
def add_to_seed(pq, point, entry_finder, rd):
if point in entry_finder:
remove_point(point, entry_finder)
count = next(counter)
entry = [rd, count, point]
entry_finder[point] = entry
heappush(pq, entry)
def remove_point(point, entry_finder):
entry = entry_finder.pop(point)
entry[-1] = REMOVED
def pop_point(pq, entry_finder):
while pq:
rd, count, point = heappop(pq)
if point is not REMOVED:
del entry_finder[point]
return point
return None
def update(neighbors, p, p_core_dist, seeds, entry_finder,
eps, min_pts):
for o in neighbors:
if o.processed:
new_reach_distance =\
max(p_core_dist, distance(p, o))
if o.reachability_distance is None: # Not in `seeds`.
o.reachability_distance = new_reach_distance
add_to_seed(seeds, o, entry_finder,
elif new_reach_distance < o.reachability_distance:
o.reachability_distance = new_reach_distance
add_to_seed(seeds, o, entry_finder,
def core_distance(p, neighbors, min_pts):
if p.core_distance is not None:
return p.core_distance
elif len(neighbors) >= min_pts - 1:
sorted_distance = sorted([
distance(n, p) for n in neighbors
p.core_distance = sorted_distance[min_pts - 2]
return p.core_distance
for p in points:
p.processed = False
p.core_distance = None
p.reachability_distance = None
neighborhood =\
Neighborhood(points, eps, distance)
unvisited = points[:]
ordered = list()
while unvisited:
p = unvisited.pop()
if p.processed:
p.processed = True
neighbors = neighborhood.of(p)
p_core_dist = core_distance(p, neighbors, min_pts)
if p_core_dist is not None:
seeds = [] # Priority queue.
entry_finder = {}
update(neighbors, p, p_core_dist,
seeds, entry_finder, eps, min_pts)
while len(seeds) > 0:
n = pop_point(seeds, entry_finder)
if n is None:
n.processed = True
n_neighbors = neighborhood.of(n)
n_core_dist = core_distance(n, n_neighbors, min_pts)
if n_core_dist is not None:
update(n_neighbors, n, n_core_dist,
seeds, entry_finder, eps, min_pts)
return ordered
def extract_clusters(ordered, threshold, min_pts):
clusters = []
separators = []
for i, p in enumerate(ordered):
rd = p.reachability_distance \
if p.reachability_distance else float('inf')
if rd > threshold:
for i in range(len(separators) - 1):
start, end = separators[i], separators[i + 1]
if end - start >= min_pts:
return clusters
# Test it
points = [
Point(37.769006, -122.429299), # cluster #1
Point(37.769044, -122.429130), # cluster #1
Point(37.768775, -122.429092), # cluster #1
Point(37.776299, -122.424249), # cluster #2
Point(37.776265, -122.424657), # cluster #2
eps, min_pts = 0.1, 2
ordered = optics(points, eps=eps, min_pts=min_pts)
clusters = extract_clusters(ordered, threshold=eps, min_pts=min_pts)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment