-
-
Save deomorxsy/9d3c0ab294837e7e66e8a9788329e6a0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
from tqdm import tqdm | |
import csv | |
import random | |
import string | |
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import * | |
random.seed(1999) | |
letters = string.ascii_lowercase | |
letters_upper = string.ascii_uppercase | |
for _i in range(0, 10): | |
letters += letters | |
for _i in range(0, 10): | |
letters += letters_upper | |
def random_string(stringLength=10): | |
"""Generate a random string of fixed length """ | |
return ''.join(random.sample(letters, stringLength)) | |
print("Products between {} and {}".format(1, 75000000)) | |
product_ids = [x for x in range(1, 75000000)] | |
dates = ['2020-07-01', '2020-07-02', '2020-07-03', '2020-07-04', '2020-07-05', '2020-07-06', '2020-07-07', '2020-07-08', | |
'2020-07-09', '2020-07-10'] | |
seller_ids = [x for x in range(1, 10)] | |
# Generate products | |
products = [[0, "product_0", 22]] | |
for p in tqdm(product_ids): | |
products.append([p, "product_{}".format(p), random.randint(1, 150)]) | |
# Save dataframe | |
df = pd.DataFrame(products) | |
df.columns = ["product_id", "product_name", "price"] | |
df.to_csv("products.csv", index=False) | |
del df | |
del products | |
# Generate sellers | |
sellers = [[0, "seller_0", 2500000]] | |
for s in tqdm(seller_ids): | |
sellers.append([s, "seller_{}".format(s), random.randint(12000, 2000000)]) | |
# Save dataframe | |
df = pd.DataFrame(sellers) | |
df.columns = ["seller_id", "seller_name", "daily_target"] | |
df.to_csv("sellers.csv", index=False) | |
# Generate sales | |
total_rows = 500000 | |
prod_zero = int(total_rows * 0.95) | |
prod_others = total_rows - prod_zero + 1 | |
df_array = [["order_id", "product_id", "seller_id", "date", "num_pieces_sold", "bill_raw_text"]] | |
with open('sales.csv', 'w', newline='') as f: | |
csvwriter = csv.writer(f) | |
csvwriter.writerows(df_array) | |
order_id = 0 | |
for i in tqdm(range(0, 40)): | |
df_array = [] | |
for i in range(0, prod_zero): | |
order_id += 1 | |
df_array.append([order_id, 0, 0, random.choice(dates), random.randint(1, 100), random_string(500)]) | |
with open('sales.csv', 'a', newline='') as f: | |
csvwriter = csv.writer(f) | |
csvwriter.writerows(df_array) | |
df_array = [] | |
for i in range(0, prod_others): | |
order_id += 1 | |
df_array.append( | |
[order_id, random.choice(product_ids), random.choice(seller_ids), random.choice(dates), | |
random.randint(1, 100), random_string(500)]) | |
with open('sales.csv', 'a', newline='') as f: | |
csvwriter = csv.writer(f) | |
csvwriter.writerows(df_array) | |
print("Done") | |
spark = SparkSession.builder \ | |
.master("local") \ | |
.config("spark.sql.autoBroadcastJoinThreshold", -1) \ | |
.appName("Exercise1") \ | |
.getOrCreate() | |
products = spark.read.csv( | |
"products.csv", header=True, mode="DROPMALFORMED" | |
) | |
products.show() | |
products.write.parquet("products_parquet", mode="overwrite") | |
sales = spark.read.csv( | |
"sales.csv", header=True, mode="DROPMALFORMED" | |
) | |
sales.show() | |
sales.repartition(200, col("product_id")).write.parquet("sales_parquet", mode="overwrite") | |
sellers = spark.read.csv( | |
"sellers.csv", header=True, mode="DROPMALFORMED" | |
) | |
sellers.show() | |
sellers.write.parquet("sellers_parquet", mode="overwrite") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment