Skip to content

Instantly share code, notes, and snippets.

@TalKachman
Forked from jiffyclub/hdf_to_parquet.py
Created September 24, 2022 16:05
Show Gist options
  • Save TalKachman/a88ca396385266fd4ece11f7013961e2 to your computer and use it in GitHub Desktop.
Save TalKachman/a88ca396385266fd4ece11f7013961e2 to your computer and use it in GitHub Desktop.
Do the same thing in Spark and Pandas
"""
Convert Pandas DFs in an HDFStore to parquet files for better compatibility
with Spark.
Run from the command line with:
spark-submit --driver-memory 4g --master 'local[*]' hdf5_to_parquet.py
"""
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')
persons = store['persons'].reset_index()
households = store['households'].reset_index()
store.close()
spark_conf = (
SparkConf()
.setAppName('SparkRunDemo')
# .setMaster('local[*]')
# .set('spark.driver.memory', '8g')
.set('spark.executor.memory', '8g')
.set('spark.python.worker.memory', '8g')
.set('spark.storage.memoryFraction', 0.2)
.set('spark.logConf', True))
print spark_conf.toDebugString()
sc = SparkContext(conf=spark_conf)
sql = SQLContext(sc)
hh_spark = sql.createDataFrame(households)
p_spark = sql.createDataFrame(persons)
hh_spark.write.parquet('households.parquet')
p_spark.write.parquet('persons.parquet')
import time
import pandas as pd
store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')
persons = store['persons']
households = store['households']
store.close()
t1 = time.time()
persons = persons.merge(households, left_on='household_id', right_index=True)
t2 = time.time()
print 'time to merge: {}'.format(t2 - t1)
persons = persons.query('age >= 18 and income >= 10000')
assert len(persons) > 0, 'no people left after query'
t3 = time.time()
print 'time to filter: {}'.format(t3 - t2)
income_by_sex = persons.groupby('sex').income.mean()
t4 = time.time()
print 'time to groupby agg: {}'.format(t4 - t3)
print 'total time: {}'.format(t4 - t1)
print income_by_sex
"""
Ran from the command line with:
time spark-submit --driver-memory 4g --master 'local[*]' spark_run.py 2> spark.log
"""
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
spark_conf = (
SparkConf()
.setAppName('SparkRunDemo')
# .setMaster('local[*]')
# .set('spark.driver.memory', '8g')
.set('spark.executor.memory', '8g')
.set('spark.python.worker.memory', '8g')
.set('spark.storage.memoryFraction', 0.2)
.set('spark.logConf', True))
print spark_conf.toDebugString()
sc = SparkContext(conf=spark_conf)
sql = SQLContext(sc)
hh_spark = sql.read.parquet('households.parquet')
p_spark = sql.read.parquet('persons.parquet')
t1 = time.time()
merged = hh_spark.join(p_spark, hh_spark.HHID == p_spark.household_id)
t2 = time.time()
print 'time to merge: {}'.format(t2 - t1)
# filtered = merged.filter((merged.age <= 18) & (merged.income >= 100000))
filtered = merged.filter('age >= 18 and income >= 10000')
t3 = time.time()
print 'time to filter: {}'.format(t3 - t2)
income_by_sex = filtered.groupby('sex').agg({'income': 'mean'})
t4 = time.time()
print 'time to groupby agg: {}'.format(t4 - t3)
print income_by_sex.collect()
t5 = time.time()
print 'time to collect: {}'.format(t5 - t4)
print 'total time: {}'.format(t5 - t1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment