Skip to content

Instantly share code, notes, and snippets.

@WillianFuks
WillianFuks / example_dataproc_twitter_worker_dataproc.py
Created December 9, 2017 14:58
Worker that interacts with Dataproc
@app.route("/dataproc_dimsum", methods=['POST'])
def dataproc_dimsum():
extended_args = request.form.get('extended_args').split(',')
setup = config['jobs']['run_dimsum']
job = gcp_service.dataproc.build_cluster(**setup)
gcp_service.storage.upload_from_filenames(
**config['jobs']['run_dimsum']['pyspark_job'])
job = gcp_service.dataproc.submit_pyspark_job(extended_args,
**config['jobs']['run_dimsum'])
result = gcp_service.dataproc.delete_cluster(**setup)
@WillianFuks
WillianFuks / example_dataproc_twitter_pyspark_base.py
Last active December 9, 2017 16:11
Base Module to Support PySpark Jobs
import abc
import datetime
import argparse
import operator
from collections import defaultdict
from pyspark.sql import SparkSession
from pyspark.sql import types as stypes
from pyspark.sql.utils import AnalysisException
from py4j.protocol import Py4JJavaError
@WillianFuks
WillianFuks / example_dataproc_twitter_dimsum.py
Created December 9, 2017 16:48
DIMSUM PySpark Implementation
from __future__ import absolute_import
import operator
import math
import random
import time
from base import JobsBase
from pyspark.sql import SparkSession
from pyspark.sql import types as stypes
import sys
from operator import itemgetter
import argparse
import datetime
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import (PipelineOptions,
SetupOptions)
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
@WillianFuks
WillianFuks / example_dataproc_twitter_main_recommender.py
Created December 9, 2017 18:10
Main AppEngine For Recommendations
@app.route("/make_recommendation")
def make_reco():
t0 = time.time()
scores = base_utils.process_input_items(request.args)
keys = map(lambda x: ndb.Key(config['recos']['kind'], x),
scores.keys())
entities = [e for e in ndb.get_multi(keys) if e]
if not entities:
result = {'results': [], 'statistics':
{'elapsed_time': time.time() - t0}}
@WillianFuks
WillianFuks / example_dataproc_twitter_utils.py
Created December 9, 2017 18:15
Example of how we processed recommendations at first
def process_recommendations(entities, scores, n=10):
t0 = time.time()
r = sum([Counter({e.items[i]: e.scores[i] * scores[e.key.id()]
for i in range(len(e.items))}) for e in entities], Counter()).items()
time_build_recos = time.time() - t0
t0 = time.time()
heapq.heapify(r)
return {'result': [{"item": k, "score": v} for k, v in heapq.nlargest(
n, r, key= lambda x: x[1])]}
@WillianFuks
WillianFuks / example_dataproc_twitter_cython.py
Created December 9, 2017 18:31
Cythonized Version of Recos
from cpython cimport dict, list
cpdef cy_aggregate_scores(list similarities, dict user_scores, int n=10):
cdef int i
cdef dict res
cdef int j
cdef double weight
res = {}
for i in range(len(similarities)):
import time
import base_utils
from config import config
from flask import Flask, request, jsonify
from connector.datastore import DatastoreService
app = Flask(__name__)
@WillianFuks
WillianFuks / example_dataproc_twitter_process_recommendations.py
Created December 11, 2017 22:43
First version of process recommendations
def process_recommendations(entities, scores, n=10):
r = sum([Counter({e.items[i]: e.scores[i] * scores[e.key.id()]
for i in range(len(e.items))}) for e in entities], Counter()).items()
heapq.heapify(r)
return {'result': [{"item": k, "score": v} for k, v in heapq.nlargest(
n, r, key= lambda x: x[1])]}
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Analysis on Test Sample Size (Customer Experience)"
]
},
{