Last active
December 5, 2017 18:57
-
-
Save rmitsch/ebadcde3a713639da4e26db5ca2dfb6d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkConf | |
from pyspark import SparkContext | |
import numpy.random as rnd | |
import numpy as np | |
import os | |
# os.environ["SPARK_HOME"] = "/usr/local/Cellar/apache-spark/1.5.1/" | |
os.environ["PYSPARK_PYTHON"] = "/home/raphael/Development/datamining/py3env/bin/python3" | |
def iterate(iterable): | |
""" | |
Auxiliary function used to print stuff. | |
:param iterable: | |
:return: | |
""" | |
r = [] | |
if type(iterable).__name__ == "int": | |
r.append(iterable) | |
return tuple(r) | |
for v1_iterable in iterable: | |
if type(v1_iterable).__name__ == "int" or type(v1_iterable).__name__ == "numpy.int64": | |
r.append(v1_iterable) | |
else: | |
v1_tuples = [] | |
for v2 in v1_iterable: | |
v1_tuples.append(v2) | |
r.append(v1_tuples) | |
return tuple(r) | |
# Tuple unpacking not supported in Python 3: | |
# https://stackoverflow.com/questions/21892989/what-is-the-good-python3-equivalent-for-auto-tuple-unpacking-in-lambda | |
def to_triple(x): | |
""" | |
Construction of rdd_A and rdd_B splits up row-wise. Hence: Fetch row-index and row, return list of triples - | |
[((row index, column index), value) for this row. Matrix representation is assembled from these triples at time of this | |
RDD's collection. | |
:param x: Matrix row with zipped row index, i. e.: (ndarray with values in row, index of row). | |
:return: | |
""" | |
# x[1] is row's index, x[0] are values in row. Return in form of (key, value). | |
return [(x[1], column_index, x[0][column_index]) for column_index in range(0, len(x[0]))] | |
def mult(rdd_A, rdd_B): | |
""" | |
Multiplies matrices A and B, stored in rdd_A and rdd_B respectively, with each other. | |
Does both mapping and reduction. | |
:param rdd_A: | |
:param rdd_B: | |
:return: | |
""" | |
# Basic idea for matrix multiplication: Multiply n-th column of A with n-th row of B. | |
# Approach as presented in | |
# https://www.thinkbiganalytics.com/2015/11/23/scalable-matrix-multiplication-using-spark-2/: Calculate product of | |
# A[:, n] x B[n, :] = C_n in mapping step. Sum up all C_n, where n in 1 to number of rows in A - and columns in B), | |
# in reduction step. | |
# 1. Group rdd_A by rows and rdd_B by columns so we can target map at A[:, n] and B[n, :]. | |
rdd_A = rdd_A.groupBy(lambda x: x[0]) | |
rdd_B = rdd_B.groupBy(lambda x: x[1]) | |
# 2. Merge RDDs by key (which is row index for rdd_A and column_index for rdd_B), multiply values. | |
# merged_rdd = rdd_A.union(rdd_B).reduceByKey(lambda x, y: [x, y]) | |
# For easier handling: Only keep values in rows/columns. | |
# Question: Is it inperformant to call list()? | |
row_times_column = rdd_A.cartesian(rdd_B).map( | |
lambda triples: ( | |
(triples[0][0], triples[1][0]), | |
np.dot( | |
np.array([triple[2] for triple in triples[0][1]]), | |
np.array([triple[2] for triple in triples[1][1]]) | |
) | |
) | |
) | |
# 3. Reduction: Add values for all triples. | |
return row_times_column #.reduceByKey(lambda x, y: np.sum(x[1] + y[1])) | |
conf = SparkConf().setAppName('master').setMaster('local') | |
sc = SparkContext(conf=conf) | |
n = 5 | |
m = 5 | |
A = rnd.randint(0, 2, size=(n, m)) | |
B = rnd.randint(0, 2, size=(m, n)) | |
print("A = \n", A) | |
print("B = \n", B) | |
# convert A, B to sparse triples | |
rdd_A = sc.parallelize(A).zipWithIndex().flatMap(to_triple) | |
rdd_B = sc.parallelize(B).zipWithIndex().flatMap(to_triple) | |
TMP = mult(rdd_A, rdd_B) | |
result = np.zeros((n, n), dtype=int) | |
for ((r,c),v) in TMP.collect(): | |
result[r,c] = v | |
print("result = \n", result) | |
print("A * B = \n", np.dot(A, B)) | |
if np.sum(result == np.dot(A, B)) == 25: | |
print("Results match.") | |
sc.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment