Skip to content

Instantly share code, notes, and snippets.

@MLWhiz
Created January 20, 2019 18:06
Show Gist options
  • Save MLWhiz/4a2982f5939622669052a3a7c84001bd to your computer and use it in GitHub Desktop.
Save MLWhiz/4a2982f5939622669052a3a7c84001bd to your computer and use it in GitHub Desktop.
def create_edges(line):
a = [int(x) for x in line.split(" ")]
edges_list=[]
for i in range(0, len(a)-1):
for j in range(i+1 ,len(a)):
edges_list.append((a[i],a[j]))
edges_list.append((a[j],a[i]))
return edges_list
# adj_list.txt is a txt file containing adjacency list of the graph.
adjacency_list = sc.textFile("adj_list.txt")
edges_rdd = adjacency_list.flatMap(lambda line : create_edges(line)).distinct()
def largeStarInit(record):
a, b = record
yield (a,b)
yield (b,a)
def largeStar(record):
a, b = record
t_list = list(b)
t_list.append(a)
list_min = min(t_list)
for x in b:
if a < x:
yield (x,list_min)
def smallStarInit(record):
a, b = record
if b<=a:
yield (a,b)
else:
yield (b,a)
def smallStar(record):
a, b = record
t_list = list(b)
t_list.append(a)
list_min = min(t_list)
for x in t_list:
if x!=list_min:
yield (x,list_min)
#Handle case for single nodes
def single_vertex(line):
a = [int(x) for x in line.split(" ")]
edges_list=[]
if len(a)==1:
edges_list.append((a[0],a[0]))
return edges_list
iteration_num =0
while 1==1:
if iteration_num==0:
print "iter", iteration_num
large_star_rdd = edges_rdd.groupByKey().flatMap(lambda x : largeStar(x))
small_star_rdd = large_star_rdd.flatMap(lambda x : smallStarInit(x)).groupByKey().flatMap(lambda x : smallStar(x)).distinct()
iteration_num += 1
else:
print "iter", iteration_num
large_star_rdd = small_star_rdd.flatMap(lambda x: largeStarInit(x)).groupByKey().flatMap(lambda x : largeStar(x)).distinct()
small_star_rdd = large_star_rdd.flatMap(lambda x : smallStarInit(x)).groupByKey().flatMap(lambda x : smallStar(x)).distinct()
iteration_num += 1
#check Convergence
changes = (large_star_rdd.subtract(small_star_rdd).union(small_star_rdd.subtract(large_star_rdd))).collect()
if len(changes) == 0 :
break
single_vertex_rdd = adjacency_list.flatMap(lambda line : single_vertex(line)).distinct()
answer = single_vertex_rdd.collect() + large_star_rdd.collect()
print answer[:10]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment