Last active
July 22, 2024 14:15
-
-
Save joshlk/871d58e01417478176e7 to your computer and use it in GitHub Desktop.
PySpark faster toPandas using mapPartitions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
def _map_to_pandas(rdds): | |
""" Needs to be here due to pickling issues """ | |
return [pd.DataFrame(list(rdds))] | |
def toPandas(df, n_partitions=None): | |
""" | |
Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is | |
repartitioned if `n_partitions` is passed. | |
:param df: pyspark.sql.DataFrame | |
:param n_partitions: int or None | |
:return: pandas.DataFrame | |
""" | |
if n_partitions is not None: df = df.repartition(n_partitions) | |
df_pand = df.rdd.mapPartitions(_map_to_pandas).collect() | |
df_pand = pd.concat(df_pand) | |
df_pand.columns = df.columns | |
return df_pand |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You can increase your spark.driver.maxResultSize in your Spark config. It seems like you df is larger than the four gigabyte. Set it to 15g or something similar and try to run it again.