Skip to content

Instantly share code, notes, and snippets.

@duke8585
Last active May 19, 2025 13:21
Show Gist options
  • Save duke8585/89300974605f80fc8c03f53593b1f6c1 to your computer and use it in GitHub Desktop.
Save duke8585/89300974605f80fc8c03f53593b1f6c1 to your computer and use it in GitHub Desktop.
spark int64 int96 timestamp testing with parquet
# for running in spark3.5.4 spark-shell
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, to_timestamp
spark = SparkSession.builder.appName("DummyTimestamps").master("local[*]").getOrCreate()
data = [
("2025-05-19T12:00:00Z", "2025-05-19T14:00:00+02:00"),
("2025-05-19T15:00:00Z", "2025-05-19T11:00:00-04:00"),
("2025-05-19T18:00:00Z", "2025-05-19T19:00:00+01:00")
]
df = spark.createDataFrame(data, ["utc_timestamp", "zoned_timestamp"])
df = (
df.withColumn("zoned_tz_info", regexp_extract("zoned_timestamp", r"([+-]\d{2}:\d{2})", 1))
.withColumn("utc_timestamp", to_timestamp("utc_timestamp"))
.withColumn("zoned_display_utc", to_timestamp(regexp_extract("zoned_timestamp", r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})", 1)))
.withColumn("zoned_timestamp", to_timestamp("zoned_timestamp"))
)
df.show(truncate=False)
df.printSchema()
spark.conf.set("spark.sql.session.timeZone", "UTC")
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") # or INT96, TIMESTAMP_MILLIS
df.write.mode("overwrite").parquet("/cwd/dummy_timestamps")
# NOTE after write, in new session
# NOTE after write, in new session
# NOTE after write, in new session
df_read = spark.read.parquet("/cwd/dummy_timestamps")
df_read.show(truncate=False)
df_read.printSchema()
ts = df_read.collect()[0]["zoned_timestamp"]
print(type(ts)) # Should be <class 'datetime.datetime'>
// for running in spark3.5.4 spark-shell
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder().appName("DummyTimestamps").master("local[*]").getOrCreate()
import spark.implicits._
val data = Seq(
("2025-05-19T12:00:00Z", "2025-05-19T14:00:00+02:00"),
("2025-05-19T15:00:00Z", "2025-05-19T11:00:00-04:00"),
("2025-05-19T18:00:00Z", "2025-05-19T19:00:00+01:00")
).toDF("utc_timestamp", "zoned_timestamp")
val df = (
data.withColumn("zoned_tz_info", regexp_extract($"zoned_timestamp", "([+-]\\d{2}:\\d{2})", 1))
.withColumn("utc_timestamp", to_timestamp($"utc_timestamp"))
.withColumn("zoned_display_utc", to_timestamp(regexp_extract($"zoned_timestamp", "(\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2})", 1)))
.withColumn("zoned_timestamp", to_timestamp($"zoned_timestamp"))
)
df.show(false)
df.printSchema
spark.conf.set("spark.sql.session.timeZone", "UTC")
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") // or INT96, TIMESTAMP_MILLIS
df.write.mode("overwrite").parquet("/tmp/dummy_timestamps")
// NOTE after write, in new session
// NOTE after write, in new session
// NOTE after write, in new session
val df = spark.read.parquet("/tmp/dummy_timestamps")
df.show(false)
df.printSchema()
val ts = df.collect()(0).getAs[Any]("zoned_timestamp")
# is class java.sql.Timestamp, which is UTC normalized, a result of spark internally converting to UTC. see https://stackoverflow.com/a/8571955
# for running in spark3.5.4 spark-shell
# NOTE start as: pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, to_timestamp
spark = (
SparkSession.builder.appName("DummyTimestampsIceberg")
.master("local[*]")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "/cwd/iceberg_warehouse")
.getOrCreate()
)
data = [
("2025-05-19T12:00:00Z", "2025-05-19T14:00:00+02:00"),
("2025-05-19T15:00:00Z", "2025-05-19T11:00:00-04:00"),
("2025-05-19T18:00:00Z", "2025-05-19T19:00:00+01:00")
]
df = spark.createDataFrame(data, ["utc_timestamp", "zoned_timestamp"])
df = (
df.withColumn("zoned_tz_info", regexp_extract("zoned_timestamp", r"([+-]\d{2}:\d{2})", 1))
.withColumn("utc_timestamp", to_timestamp("utc_timestamp"))
.withColumn("zoned_display_utc", to_timestamp(regexp_extract("zoned_timestamp", r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})", 1)))
.withColumn("zoned_timestamp", to_timestamp("zoned_timestamp"))
)
df.writeTo("local.iceberg_dummy_timestamps").using("iceberg").createOrReplace()
# NOTE after write, in new session
# NOTE after write, in new session
# NOTE after write, in new session
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("DummyTimestampsIceberg")
.master("local[*]")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "/cwd/iceberg_warehouse")
.getOrCreate()
)
df_read = spark.read.table("local.iceberg_dummy_timestamps")
df_read.show(truncate=False)
df_read.printSchema()
df_sql = spark.sql("SELECT * FROM local.iceberg_dummy_timestamps")
df_sql.show(truncate=False)
// for running in spark3.5.4 spark-shell
// NOTE start as: spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = (
SparkSession.builder()
.appName("DummyTimestampsIceberg")
.master("local[*]")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "/cwd/iceberg_warehouse")
.getOrCreate()
)
import spark.implicits._
val data = Seq(
("2025-05-19T12:00:00Z", "2025-05-19T14:00:00+02:00"),
("2025-05-19T15:00:00Z", "2025-05-19T11:00:00-04:00"),
("2025-05-19T18:00:00Z", "2025-05-19T19:00:00+01:00")
).toDF("utc_timestamp", "zoned_timestamp")
val df = (
data.withColumn("zoned_tz_info", regexp_extract($"zoned_timestamp", "([+-]\\d{2}:\\d{2})", 1))
.withColumn("utc_timestamp", to_timestamp($"utc_timestamp"))
.withColumn("zoned_display_utc", to_timestamp(regexp_extract($"zoned_timestamp", "(\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2})", 1)))
.withColumn("zoned_timestamp", to_timestamp($"zoned_timestamp"))
)
df.writeTo("local.iceberg_dummy_timestamps").using("iceberg").createOrReplace()
// NOTE after write, in new session
// NOTE after write, in new session
// NOTE after write, in new session
import org.apache.spark.sql.SparkSession
val spark = (
SparkSession.builder()
.appName("DummyTimestampsIceberg")
.master("local[*]")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "/cwd/iceberg_warehouse")
.getOrCreate()
)
val df_read = spark.read.table("local.iceberg_dummy_timestamps")
df_read.show(false)
df_read.printSchema()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment