Last active
May 19, 2025 13:21
-
-
Save duke8585/89300974605f80fc8c03f53593b1f6c1 to your computer and use it in GitHub Desktop.
spark int64 int96 timestamp testing with parquet
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# for running in spark3.5.4 spark-shell | |
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import regexp_extract, to_timestamp | |
spark = SparkSession.builder.appName("DummyTimestamps").master("local[*]").getOrCreate() | |
data = [ | |
("2025-05-19T12:00:00Z", "2025-05-19T14:00:00+02:00"), | |
("2025-05-19T15:00:00Z", "2025-05-19T11:00:00-04:00"), | |
("2025-05-19T18:00:00Z", "2025-05-19T19:00:00+01:00") | |
] | |
df = spark.createDataFrame(data, ["utc_timestamp", "zoned_timestamp"]) | |
df = ( | |
df.withColumn("zoned_tz_info", regexp_extract("zoned_timestamp", r"([+-]\d{2}:\d{2})", 1)) | |
.withColumn("utc_timestamp", to_timestamp("utc_timestamp")) | |
.withColumn("zoned_display_utc", to_timestamp(regexp_extract("zoned_timestamp", r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})", 1))) | |
.withColumn("zoned_timestamp", to_timestamp("zoned_timestamp")) | |
) | |
df.show(truncate=False) | |
df.printSchema() | |
spark.conf.set("spark.sql.session.timeZone", "UTC") | |
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") # or INT96, TIMESTAMP_MILLIS | |
df.write.mode("overwrite").parquet("/cwd/dummy_timestamps") | |
# NOTE after write, in new session | |
# NOTE after write, in new session | |
# NOTE after write, in new session | |
df_read = spark.read.parquet("/cwd/dummy_timestamps") | |
df_read.show(truncate=False) | |
df_read.printSchema() | |
ts = df_read.collect()[0]["zoned_timestamp"] | |
print(type(ts)) # Should be <class 'datetime.datetime'> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// for running in spark3.5.4 spark-shell | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions._ | |
val spark = SparkSession.builder().appName("DummyTimestamps").master("local[*]").getOrCreate() | |
import spark.implicits._ | |
val data = Seq( | |
("2025-05-19T12:00:00Z", "2025-05-19T14:00:00+02:00"), | |
("2025-05-19T15:00:00Z", "2025-05-19T11:00:00-04:00"), | |
("2025-05-19T18:00:00Z", "2025-05-19T19:00:00+01:00") | |
).toDF("utc_timestamp", "zoned_timestamp") | |
val df = ( | |
data.withColumn("zoned_tz_info", regexp_extract($"zoned_timestamp", "([+-]\\d{2}:\\d{2})", 1)) | |
.withColumn("utc_timestamp", to_timestamp($"utc_timestamp")) | |
.withColumn("zoned_display_utc", to_timestamp(regexp_extract($"zoned_timestamp", "(\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2})", 1))) | |
.withColumn("zoned_timestamp", to_timestamp($"zoned_timestamp")) | |
) | |
df.show(false) | |
df.printSchema | |
spark.conf.set("spark.sql.session.timeZone", "UTC") | |
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") // or INT96, TIMESTAMP_MILLIS | |
df.write.mode("overwrite").parquet("/tmp/dummy_timestamps") | |
// NOTE after write, in new session | |
// NOTE after write, in new session | |
// NOTE after write, in new session | |
val df = spark.read.parquet("/tmp/dummy_timestamps") | |
df.show(false) | |
df.printSchema() | |
val ts = df.collect()(0).getAs[Any]("zoned_timestamp") | |
# is class java.sql.Timestamp, which is UTC normalized, a result of spark internally converting to UTC. see https://stackoverflow.com/a/8571955 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# for running in spark3.5.4 spark-shell | |
# NOTE start as: pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 | |
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import regexp_extract, to_timestamp | |
spark = ( | |
SparkSession.builder.appName("DummyTimestampsIceberg") | |
.master("local[*]") | |
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") | |
.config("spark.sql.catalog.local.type", "hadoop") | |
.config("spark.sql.catalog.local.warehouse", "/cwd/iceberg_warehouse") | |
.getOrCreate() | |
) | |
data = [ | |
("2025-05-19T12:00:00Z", "2025-05-19T14:00:00+02:00"), | |
("2025-05-19T15:00:00Z", "2025-05-19T11:00:00-04:00"), | |
("2025-05-19T18:00:00Z", "2025-05-19T19:00:00+01:00") | |
] | |
df = spark.createDataFrame(data, ["utc_timestamp", "zoned_timestamp"]) | |
df = ( | |
df.withColumn("zoned_tz_info", regexp_extract("zoned_timestamp", r"([+-]\d{2}:\d{2})", 1)) | |
.withColumn("utc_timestamp", to_timestamp("utc_timestamp")) | |
.withColumn("zoned_display_utc", to_timestamp(regexp_extract("zoned_timestamp", r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})", 1))) | |
.withColumn("zoned_timestamp", to_timestamp("zoned_timestamp")) | |
) | |
df.writeTo("local.iceberg_dummy_timestamps").using("iceberg").createOrReplace() | |
# NOTE after write, in new session | |
# NOTE after write, in new session | |
# NOTE after write, in new session | |
from pyspark.sql import SparkSession | |
spark = ( | |
SparkSession.builder.appName("DummyTimestampsIceberg") | |
.master("local[*]") | |
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") | |
.config("spark.sql.catalog.local.type", "hadoop") | |
.config("spark.sql.catalog.local.warehouse", "/cwd/iceberg_warehouse") | |
.getOrCreate() | |
) | |
df_read = spark.read.table("local.iceberg_dummy_timestamps") | |
df_read.show(truncate=False) | |
df_read.printSchema() | |
df_sql = spark.sql("SELECT * FROM local.iceberg_dummy_timestamps") | |
df_sql.show(truncate=False) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// for running in spark3.5.4 spark-shell | |
// NOTE start as: spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions._ | |
val spark = ( | |
SparkSession.builder() | |
.appName("DummyTimestampsIceberg") | |
.master("local[*]") | |
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") | |
.config("spark.sql.catalog.local.type", "hadoop") | |
.config("spark.sql.catalog.local.warehouse", "/cwd/iceberg_warehouse") | |
.getOrCreate() | |
) | |
import spark.implicits._ | |
val data = Seq( | |
("2025-05-19T12:00:00Z", "2025-05-19T14:00:00+02:00"), | |
("2025-05-19T15:00:00Z", "2025-05-19T11:00:00-04:00"), | |
("2025-05-19T18:00:00Z", "2025-05-19T19:00:00+01:00") | |
).toDF("utc_timestamp", "zoned_timestamp") | |
val df = ( | |
data.withColumn("zoned_tz_info", regexp_extract($"zoned_timestamp", "([+-]\\d{2}:\\d{2})", 1)) | |
.withColumn("utc_timestamp", to_timestamp($"utc_timestamp")) | |
.withColumn("zoned_display_utc", to_timestamp(regexp_extract($"zoned_timestamp", "(\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2})", 1))) | |
.withColumn("zoned_timestamp", to_timestamp($"zoned_timestamp")) | |
) | |
df.writeTo("local.iceberg_dummy_timestamps").using("iceberg").createOrReplace() | |
// NOTE after write, in new session | |
// NOTE after write, in new session | |
// NOTE after write, in new session | |
import org.apache.spark.sql.SparkSession | |
val spark = ( | |
SparkSession.builder() | |
.appName("DummyTimestampsIceberg") | |
.master("local[*]") | |
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") | |
.config("spark.sql.catalog.local.type", "hadoop") | |
.config("spark.sql.catalog.local.warehouse", "/cwd/iceberg_warehouse") | |
.getOrCreate() | |
) | |
val df_read = spark.read.table("local.iceberg_dummy_timestamps") | |
df_read.show(false) | |
df_read.printSchema() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment