Skip to content

Instantly share code, notes, and snippets.

@WillianFuks
Last active December 8, 2017 02:17
Show Gist options
  • Save WillianFuks/574a0c26fedd1c8300e1bc925f429cc1 to your computer and use it in GitHub Desktop.
Save WillianFuks/574a0c26fedd1c8300e1bc925f429cc1 to your computer and use it in GitHub Desktop.
Naive Cosines in Map-Reduce
import operator
import math
import time
from base import JobsBase
from pyspark.sql import SparkSession
from pyspark.sql import types as stypes
class NaiveJob(JobsBase):
def run(self, sc, args):
self.transform_data(sc, args)
self.build_naive(sc, args)
def build_naive(self, sc, args):
spark = SparkSession(sc)
data = sc.emptyRDD()
for day in range(args.days_init, args.days_end - 1, -1):
formatted_day = self.get_formatted_date(day)
inter_uri = args.inter_uri.format(formatted_day)
data = data.union(spark.read.json(inter_uri,
schema=self.load_users_schema()).rdd)
data = (data.reduceByKey(operator.add)
.flatMap(lambda x: self.aggregate_skus(x))
.filter(lambda x: len(x[1]) > 1 and len(x[1]) <= 100))
norms = self._broadcast_norms(sc, data)
data = (data
.flatMap(lambda x: self.process_intersections(x, norms))
.reduceByKey(operator.add))
self.save_neighbor_matrix(args.neighbor_uri, data)
@staticmethod
def process_intersections(row, norms):
for i in range(len(row[1])):
for j in range(i + 1, len(row[1])):
yield ((row[1][i][0], row[1][j][0]),
row[1][i][1] * row[1][j][1] / (
norms.value[row[1][i][0]] *
norms.value[row[1][j][0]]))
def _broadcast_norms(self, sc, data):
norms = {sku: norm for sku, norm in (data.flatMap(
lambda x: self._process_scores(x))
.reduceByKey(operator.add)
.map(lambda x: (x[0], math.sqrt(x[1])))
.collect())}
norms = sc.broadcast(norms)
return norms
@staticmethod
def _process_scores(row):
for inner_row in row[1]:
yield (inner_row[0], inner_row[1] ** 2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment