Last active
April 27, 2023 23:53
-
-
Save chhantyal/df5c4efcd85d52c410c623c67939bf10 to your computer and use it in GitHub Desktop.
Convert Spark RDD to Pandas DataFrame inside Spark executors and make Spark DataFrame from resulting RDD. This is distributed i.e. no need for collecting RDD to driver.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Spark DataFrame is distributed but it lacks many features compared to Pandas. | |
If you want to use Pandas, you can't just convert Spark DF to Pandas because that means collecting it to driver. | |
It can be slow & not work at all when data size is big. | |
So only way to use Pandas is to create mini dataframes inside executors. | |
This gist shows how to create DataFrame from RDD inside Spark executors & build Spark DataFrame from final output. | |
""" | |
# Convert function to use in mapPartitions | |
def rdd_to_pandas(rdd_): | |
# convert rows to dict | |
rows = (row_.asDict() for row_ in rdd_) | |
# create pandas dataframe | |
pdf = pd.DataFrame(rows) | |
# Rows/Pandas DF can be empty depending on partition logic. | |
# Make sure to check it here, otherwise it will throw untrackable error | |
if len(pdf) > 0: | |
# | |
# Do something with pandas DataFrame | |
# | |
pass | |
return pdf.to_dict(orient='records') | |
# Create Spark DataFrame from resulting RDD | |
rdf = spark.createDataFrame(df.rdd.mapPartitions(rdd_to_pandas)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment