Skip to content

Instantly share code, notes, and snippets.

@nashibao
Created September 8, 2015 08:42
Show Gist options
  • Save nashibao/31b726608d67dc8e7138 to your computer and use it in GitHub Desktop.
Save nashibao/31b726608d67dc8e7138 to your computer and use it in GitHub Desktop.
from os.path import realpath
import sys
import numpy as np
from numpy.random import rand
from numpy import matrix
from numpy import multiply
from pyspark import SparkContext
LAMBDA = 0.01 # regularization for als
np.random.seed(42)
def rmse(R, ms, H, M, U):
diff = R - ms * H.T
return np.sqrt(np.sum(np.power(diff, 2)) / M * U)
def update_als(x, W, H, V):
w = W[x, :]
v = V[x, :]
m = H.shape[0]
k = H.shape[1]
HtH = H.T * H
HtVt = H.T * v.T
for i in range(k):
HtH[i, i] += LAMBDA * m
return np.linalg.solve(HtH, HtVt)
def update_gaussian_mu(x, W, H, V):
w = W[x, :]
v = V[x, :]
return multiply(w, (v * H) / (w * (H.T * H) + 10**-9)).T
def mf(sc, n=10000, m=10000, k=10, ITERATIONS=10, partitions=10):
errors = []
print "Running mf with n=%d, m=%d, k=%d, iters=%d, partitions=%d\n" % \
(n, m, k, ITERATIONS, partitions)
V = matrix(rand(n, k)) * matrix(rand(m, k).T)
W = matrix(rand(n, k))
H = matrix(rand(m, k))
Vb = sc.broadcast(V)
Wb = sc.broadcast(W)
Hb = sc.broadcast(H)
for i in range(ITERATIONS):
if i < 3:
update = update_als
else:
update = update_gaussian_mu
W = sc.parallelize(range(n), partitions) \
.map(lambda x: update(x, Wb.value, Hb.value, Vb.value)) \
.collect()
W = matrix(np.array(W)[:, :, 0])
Wb = sc.broadcast(W)
H = sc.parallelize(range(m), partitions) \
.map(lambda x: update(x, Hb.value, Wb.value, Vb.value.T)) \
.collect()
H = matrix(np.array(H)[:, :, 0])
Hb = sc.broadcast(H)
error = rmse(V, W, H, n, m)
errors.append(error)
return errors
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment