Created
January 20, 2019 18:06
-
-
Save MLWhiz/4a2982f5939622669052a3a7c84001bd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def create_edges(line): | |
a = [int(x) for x in line.split(" ")] | |
edges_list=[] | |
for i in range(0, len(a)-1): | |
for j in range(i+1 ,len(a)): | |
edges_list.append((a[i],a[j])) | |
edges_list.append((a[j],a[i])) | |
return edges_list | |
# adj_list.txt is a txt file containing adjacency list of the graph. | |
adjacency_list = sc.textFile("adj_list.txt") | |
edges_rdd = adjacency_list.flatMap(lambda line : create_edges(line)).distinct() | |
def largeStarInit(record): | |
a, b = record | |
yield (a,b) | |
yield (b,a) | |
def largeStar(record): | |
a, b = record | |
t_list = list(b) | |
t_list.append(a) | |
list_min = min(t_list) | |
for x in b: | |
if a < x: | |
yield (x,list_min) | |
def smallStarInit(record): | |
a, b = record | |
if b<=a: | |
yield (a,b) | |
else: | |
yield (b,a) | |
def smallStar(record): | |
a, b = record | |
t_list = list(b) | |
t_list.append(a) | |
list_min = min(t_list) | |
for x in t_list: | |
if x!=list_min: | |
yield (x,list_min) | |
#Handle case for single nodes | |
def single_vertex(line): | |
a = [int(x) for x in line.split(" ")] | |
edges_list=[] | |
if len(a)==1: | |
edges_list.append((a[0],a[0])) | |
return edges_list | |
iteration_num =0 | |
while 1==1: | |
if iteration_num==0: | |
print "iter", iteration_num | |
large_star_rdd = edges_rdd.groupByKey().flatMap(lambda x : largeStar(x)) | |
small_star_rdd = large_star_rdd.flatMap(lambda x : smallStarInit(x)).groupByKey().flatMap(lambda x : smallStar(x)).distinct() | |
iteration_num += 1 | |
else: | |
print "iter", iteration_num | |
large_star_rdd = small_star_rdd.flatMap(lambda x: largeStarInit(x)).groupByKey().flatMap(lambda x : largeStar(x)).distinct() | |
small_star_rdd = large_star_rdd.flatMap(lambda x : smallStarInit(x)).groupByKey().flatMap(lambda x : smallStar(x)).distinct() | |
iteration_num += 1 | |
#check Convergence | |
changes = (large_star_rdd.subtract(small_star_rdd).union(small_star_rdd.subtract(large_star_rdd))).collect() | |
if len(changes) == 0 : | |
break | |
single_vertex_rdd = adjacency_list.flatMap(lambda line : single_vertex(line)).distinct() | |
answer = single_vertex_rdd.collect() + large_star_rdd.collect() | |
print answer[:10] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment