Created
November 20, 2024 20:26
-
-
Save maneeshdisodia/64f5d4902fece196fd42106fc05c6d9d to your computer and use it in GitHub Desktop.
pyspark concat with multiple columns
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import DataFrame, SparkSession | |
from pyspark.sql.functions import lit | |
import logging | |
# Initialize Spark session | |
spark = SparkSession.builder.appName("ConcatDataFrames").getOrCreate() | |
# Logger setup | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger("ConcatLogger") | |
def concat_dataframes(df1: DataFrame, df2: DataFrame) -> DataFrame: | |
""" | |
Concatenates two DataFrames with different columns by aligning their schemas. | |
Args: | |
df1 (DataFrame): First DataFrame. | |
df2 (DataFrame): Second DataFrame. | |
Returns: | |
DataFrame: Concatenated DataFrame with aligned columns. | |
""" | |
try: | |
# Find the columns that are missing in each DataFrame | |
missing_columns_df1 = set(df2.columns) - set(df1.columns) | |
missing_columns_df2 = set(df1.columns) - set(df2.columns) | |
# Add the missing columns to df1 | |
for col in missing_columns_df1: | |
df1 = df1.withColumn(col, lit(None)) | |
# Add the missing columns to df2 | |
for col in missing_columns_df2: | |
df2 = df2.withColumn(col, lit(None)) | |
# Reorder the columns in both DataFrames to be the same | |
df1 = df1.select(sorted(df1.columns)) | |
df2 = df2.select(sorted(df2.columns)) | |
# Concatenate the DataFrames | |
df_combined = df1.unionByName(df2) | |
return df_combined | |
except Exception as e: | |
logger.error(f"Error in concat_dataframes: {e}") | |
raise | |
# Example usage | |
data1 = [(1, "A"), (2, "B"), (3, "C")] | |
columns1 = ["id", "name"] | |
df1 = spark.createDataFrame(data1, columns1) | |
data2 = [(4, "D", "X"), (5, "E", "Y"), (6, "F", "Z")] | |
columns2 = ["id", "name", "extra"] | |
df2 = spark.createDataFrame(data2, columns2) | |
try: | |
df_combined = concat_dataframes(df1, df2) | |
# Show the combined DataFrame | |
df_combined.show() | |
except Exception as e: | |
logger.error(f"Failed to concatenate DataFrames: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment