Skip to content

Instantly share code, notes, and snippets.

@saswata-dutta
Last active March 31, 2019 09:51
Show Gist options
  • Save saswata-dutta/6da028f7be6281233bbd6f8d3f6d0f70 to your computer and use it in GitHub Desktop.
Save saswata-dutta/6da028f7be6281233bbd6f8d3f6d0f70 to your computer and use it in GitHub Desktop.
Demo schema evolution in parquet -- Merging schema
scala> val df = spark.read.option("mergeSchema", "true").parquet("/Users/saswatdutta/playground/df/df3/*"); df.printSchema
root
|-- id: long (nullable = true)
|-- created_at: long (nullable = true)
df: org.apache.spark.sql.DataFrame = [id: bigint, created_at: bigint]
scala> val df = spark.read.option("mergeSchema", "true").parquet("/Users/saswatdutta/playground/df/df1/*"); df.printSchema
root
|-- id: long (nullable = true)
|-- created_at: long (nullable = true)
|-- last_updated_at: long (nullable = true)
df: org.apache.spark.sql.DataFrame = [id: bigint, created_at: bigint ... 1 more field]
scala> val df = spark.read.option("mergeSchema", "true").parquet("/Users/saswatdutta/playground/df/df0/*"); df.printSchema
root
|-- id: long (nullable = true)
|-- created_at: long (nullable = true)
|-- last_updated_at: long (nullable = true)
|-- location_id: long (nullable = true)
df: org.apache.spark.sql.DataFrame = [id: bigint, created_at: bigint ... 2 more fields]
scala> val df = spark.read.option("mergeSchema", "true").parquet("/Users/saswatdutta/playground/df/*/*"); df.printSchema
root
|-- id: long (nullable = true)
|-- created_at: long (nullable = true)
|-- last_updated_at: long (nullable = true)
|-- location_id: long (nullable = true)
df: org.apache.spark.sql.DataFrame = [id: bigint, created_at: bigint ... 2 more fields]
scala> val avsc_str = scala.io.Source.fromFile("/Users/saswatdutta/playground/sample1.avsc").getLines.mkString.replaceAll("\\s+", "")
avsc_str: String = {"fields":[{"default":0,"name":"id","type":["long","null"]},{"default":0,"name":"created_at","type":["long","null"]},{"default":0,"name":"last_updated_at","type":["long","null"]},{"default":0,"name":"location_id","type":["long","null"]}],"type":"record","name":"tet"}
scala> df.write.format("avro").option("avroSchema", avsc_str).save("file:///Users/saswatdutta/playground/out/df_merged_avro")
scala> val avsc_str = scala.io.Source.fromFile("/Users/saswatdutta/playground/sample.avsc").getLines.mkString.replaceAll("\\s+", "")
avsc_str: String = {"fields":[{"default":0,"name":"id","type":"long"},{"default":0,"name":"created_at","type":"long"},{"default":0,"name":"last_updated_at","type":"long"},{"default":0,"name":"location_id","type":"long"}],"type":"record","name":"tet"}
scala> val df_avro_merged = spark.read.format("avro").option("avroSchema", avsc_str).load("file:///Users/saswatdutta/playground/out/df_merged_avro")
df_avro_merged: org.apache.spark.sql.DataFrame = [id: bigint, created_at: bigint ... 2 more fields]
scala> df_avro_merged.printSchema
root
|-- id: long (nullable = true)
|-- created_at: long (nullable = true)
|-- last_updated_at: long (nullable = true)
|-- location_id: long (nullable = true)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment