Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save maneeshdisodia/64f5d4902fece196fd42106fc05c6d9d to your computer and use it in GitHub Desktop.
Save maneeshdisodia/64f5d4902fece196fd42106fc05c6d9d to your computer and use it in GitHub Desktop.
pyspark concat with multiple columns
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import lit
import logging
# Initialize Spark session
spark = SparkSession.builder.appName("ConcatDataFrames").getOrCreate()
# Logger setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ConcatLogger")
def concat_dataframes(df1: DataFrame, df2: DataFrame) -> DataFrame:
"""
Concatenates two DataFrames with different columns by aligning their schemas.
Args:
df1 (DataFrame): First DataFrame.
df2 (DataFrame): Second DataFrame.
Returns:
DataFrame: Concatenated DataFrame with aligned columns.
"""
try:
# Find the columns that are missing in each DataFrame
missing_columns_df1 = set(df2.columns) - set(df1.columns)
missing_columns_df2 = set(df1.columns) - set(df2.columns)
# Add the missing columns to df1
for col in missing_columns_df1:
df1 = df1.withColumn(col, lit(None))
# Add the missing columns to df2
for col in missing_columns_df2:
df2 = df2.withColumn(col, lit(None))
# Reorder the columns in both DataFrames to be the same
df1 = df1.select(sorted(df1.columns))
df2 = df2.select(sorted(df2.columns))
# Concatenate the DataFrames
df_combined = df1.unionByName(df2)
return df_combined
except Exception as e:
logger.error(f"Error in concat_dataframes: {e}")
raise
# Example usage
data1 = [(1, "A"), (2, "B"), (3, "C")]
columns1 = ["id", "name"]
df1 = spark.createDataFrame(data1, columns1)
data2 = [(4, "D", "X"), (5, "E", "Y"), (6, "F", "Z")]
columns2 = ["id", "name", "extra"]
df2 = spark.createDataFrame(data2, columns2)
try:
df_combined = concat_dataframes(df1, df2)
# Show the combined DataFrame
df_combined.show()
except Exception as e:
logger.error(f"Failed to concatenate DataFrames: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment