Skip to content

Instantly share code, notes, and snippets.

View WillKoehrsen's full-sized avatar
🌆
building

Will Koehrsen WillKoehrsen

🌆
building
View GitHub Profile
from keras.layers import Input, Embedding, Dot, Reshape, Dense
from keras.models import Model
def book_embedding_model(embedding_size = 50, classification = False):
"""Model to embed books and wikilinks using the Keras functional API.
Trained to discern if a link is present in on a book's page"""
# Both inputs are 1-dimensional
book = Input(name = 'book', shape = [1])
link = Input(name = 'link', shape = [1])
import featuretools as ft
import featuretools.variable_types as vtypes
# Create entityset
es = ft.EntitySet(id = 'customers')
# Create entity from members
es.entity_from_dataframe(entity_id='members', dataframe=members,
index = 'msno',
time_index = 'registration_init_time',
# Both inputs are 1-dimensional
book = Input(name = 'book', shape = [1])
link = Input(name = 'link', shape = [1])
# Embedding the book (shape will be (None, 1, 50))
book_embedding = Embedding(name = 'book_embedding',
input_dim = len(book_index),
output_dim = embedding_size)(book)
# Embedding the link (shape will be (None, 1, 50))
import featuretools as ft
import pandas as pd
def partition_to_feature_matrix(partition_num, feature_defs):
"""Calculate a feature matrix for one partition and save"""
# Read in data from partition directory
members = pd.read_csv(f's3://{partition}/members.csv')
# ... Read in other dataframes
# Run feature engineering in parallel
r = sc.parallelize(partitions, numSlices=N_PARTITIONS).\
map(lambda x: partition_to_feature_matrix(x, feature_defs)).collect()
import pyspark
conf = pyspark.SparkConf()
# Set number of workers and number of cores
conf.set('spark.num.executors', 3)
conf.set('spark.executor.memory', '12g')
conf.set('spark.executor.cores', 4)
# Create a spark context
# Partition a large file that cannot fit into memory
for chunk in pd.read_csv(user_logs_file_path, chunksize = chunksize):
partition_by_hashing(chunk, name = 'logs', progress = None)
import pandas as pd
# Convert customer id to partition number
members['partition'] = members['msno'].apply(id_to_hash)
# Iteration through grouped partitions
for partition, grouped in members.groupby('partition'):
# Open file for appending
with open(file_dir + f'p{partition}/members.csv', 'a') as f:
import hashlib
N_PARTITIONS = 1000
def id_to_hash(customer_id):
"""Return a 16-bit integer hash of a customer id string"""
customer_hash = int(hashlib.md5(customer_id).hexdigest(), 16)
partition = customer_hash % N_PARTITIONS
return partition
# Object for handling xml
handler = WikiXmlHandler()
# Parsing object
parser = xml.sax.make_parser()
parser.setContentHandler(handler)
start = timer()
# Parse the entire file
for line in subprocess.Popen(['bzcat'],
stdin = open(data_path),
stdout = subprocess.PIPE).stdout: