Skip to content

Instantly share code, notes, and snippets.

@j-thepac
Created March 9, 2024 07:35
Show Gist options
  • Save j-thepac/d646175282d995a1f4e456ae914ebe37 to your computer and use it in GitHub Desktop.
Save j-thepac/d646175282d995a1f4e456ae914ebe37 to your computer and use it in GitHub Desktop.
TestDelta Logic
# %%
import os
# %%
os.system("pip3 install pyspark==3.3.2")
# %%
import findspark
findspark.init('/Users/deepakjayaprakash/Downloads/spark-3.3.2-bin-hadoop3/')
# %%
from pyspark.sql import SparkSession
# %%
import pyspark
print("PySpark version:", pyspark.__version__)
# %%
spark = SparkSession.builder.appName("test")\
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0")\
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
.getOrCreate()
# %%
from delta.tables import *
# %%
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table2")
# %%
path="/Users/deepakjayaprakash/Desktop/test"
# %%
df=spark.createDataFrame([{"id":1,"name":'A'}])
# %%
df.coalesce(1).write.format("delta").mode("overwrite").save(path)
# %%
targetDF=DeltaTable.forPath(spark,path)
# %%
df=spark.createDataFrame([{"id":1,"name":'A'},{"id":2,"name":'B'}])
# %%
targetDF.alias("target").merge(df.alias("src"),"target.id==src.id").whenNotMatchedInsertAll().whenMatchedUpdateAll().execute()
# %%
DeltaTable.forPath(spark,path).toDF().show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment