Last active
December 8, 2017 02:17
-
-
Save WillianFuks/574a0c26fedd1c8300e1bc925f429cc1 to your computer and use it in GitHub Desktop.
Naive Cosines in Map-Reduce
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import operator | |
import math | |
import time | |
from base import JobsBase | |
from pyspark.sql import SparkSession | |
from pyspark.sql import types as stypes | |
class NaiveJob(JobsBase): | |
def run(self, sc, args): | |
self.transform_data(sc, args) | |
self.build_naive(sc, args) | |
def build_naive(self, sc, args): | |
spark = SparkSession(sc) | |
data = sc.emptyRDD() | |
for day in range(args.days_init, args.days_end - 1, -1): | |
formatted_day = self.get_formatted_date(day) | |
inter_uri = args.inter_uri.format(formatted_day) | |
data = data.union(spark.read.json(inter_uri, | |
schema=self.load_users_schema()).rdd) | |
data = (data.reduceByKey(operator.add) | |
.flatMap(lambda x: self.aggregate_skus(x)) | |
.filter(lambda x: len(x[1]) > 1 and len(x[1]) <= 100)) | |
norms = self._broadcast_norms(sc, data) | |
data = (data | |
.flatMap(lambda x: self.process_intersections(x, norms)) | |
.reduceByKey(operator.add)) | |
self.save_neighbor_matrix(args.neighbor_uri, data) | |
@staticmethod | |
def process_intersections(row, norms): | |
for i in range(len(row[1])): | |
for j in range(i + 1, len(row[1])): | |
yield ((row[1][i][0], row[1][j][0]), | |
row[1][i][1] * row[1][j][1] / ( | |
norms.value[row[1][i][0]] * | |
norms.value[row[1][j][0]])) | |
def _broadcast_norms(self, sc, data): | |
norms = {sku: norm for sku, norm in (data.flatMap( | |
lambda x: self._process_scores(x)) | |
.reduceByKey(operator.add) | |
.map(lambda x: (x[0], math.sqrt(x[1]))) | |
.collect())} | |
norms = sc.broadcast(norms) | |
return norms | |
@staticmethod | |
def _process_scores(row): | |
for inner_row in row[1]: | |
yield (inner_row[0], inner_row[1] ** 2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment