Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created August 19, 2021 05:31
Show Gist options
  • Save nsivabalan/e5ac69677ed05e9f2749fae67d3abc19 to your computer and use it in GitHub Desktop.
Save nsivabalan/e5ac69677ed05e9f2749fae67d3abc19 to your computer and use it in GitHub Desktop.
scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._
scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._
scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._
scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._
scala>
scala> val tableName = "hudi_trips_cow"
tableName: String = hudi_trips_cow
scala> val basePath = "file:///tmp/hudi_trips_cow"
basePath: String = file:///tmp/hudi_trips_cow
scala> val dataGen = new DataGenerator
dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@11084cb5
scala> val inserts = convertToStringList(dataGen.generateInserts(10))
inserts: java.util.List[String] = [{"ts": 1628939270499, "uuid": "ce2137b9-5e30-4ac4-a038-57326af86477", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1628760621140, "uuid": "2421c4c1-7198-415b-9221-5473383b4a73", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1629343257295, "uuid": "9c2328b4-d35e-4464-be64-fbaa4ac1623d", "rider": "rider-213", "driver"...
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
warning: there was one deprecation warning (since 2.12.0)
warning: there was one deprecation warning (since 2.2.0)
warning: there were two deprecation warnings in total; for details, enable `:setting -deprecation' or `:replay -deprecation'
df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields]
scala> import org.apache.spark.sql.types.DataTypes._
import org.apache.spark.sql.types.DataTypes._
scala> import org.apache.spark.sql.functions
import org.apache.spark.sql.functions
scala> val df1 = df.withColumn("_hoodie_is_deleted", lit(false).cast(BooleanType))
df1: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 9 more fields]
scala> df1.printSchema
root
|-- begin_lat: double (nullable = true)
|-- begin_lon: double (nullable = true)
|-- driver: string (nullable = true)
|-- end_lat: double (nullable = true)
|-- end_lon: double (nullable = true)
|-- fare: double (nullable = true)
|-- partitionpath: string (nullable = true)
|-- rider: string (nullable = true)
|-- ts: long (nullable = true)
|-- uuid: string (nullable = true)
|-- _hoodie_is_deleted: boolean (nullable = false)
scala> df1.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option(TABLE_NAME.key(), tableName).
| mode(Overwrite).
| save(basePath)
21/08/19 01:23:36 WARN HoodieSparkSqlWriter$: hoodie table at file:/tmp/hudi_trips_cow already exists. Deleting existing data & overwriting with new data.
//read
scala> val tripsSnapshotDF = spark.
| read.
| format("hudi").
| load(basePath + "/*/*/*/*")
tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 14 more fields]
scala> //load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
scala> spark.sql("select uuid, partitionpath, _hoodie_is_deleted from hudi_trips_snapshot").show()
+--------------------+--------------------+------------------+
| uuid| partitionpath|_hoodie_is_deleted|
+--------------------+--------------------+------------------+
|9c2328b4-d35e-446...|americas/united_s...| false|
|2622c910-2de1-4da...|americas/united_s...| false|
|3fbb2fc0-97d0-43f...|americas/united_s...| false|
|00fe5835-a676-4e6...|americas/united_s...| false|
|0f63529f-f360-4ac...|americas/united_s...| false|
|ce2137b9-5e30-4ac...|americas/brazil/s...| false|
|2421c4c1-7198-415...|americas/brazil/s...| false|
|eb84bae5-24d5-4e7...|americas/brazil/s...| false|
|fdd0fd6a-7276-49f...| asia/india/chennai| false|
|5d754998-50f9-41f...| asia/india/chennai| false|
+--------------------+--------------------+------------------+
// Issue deletes for first 2 records
scala> val df2 = df.limit(2)
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [begin_lat: double, begin_lon: double ... 8 more fields]
scala> df2.printSchema
root
|-- begin_lat: double (nullable = true)
|-- begin_lon: double (nullable = true)
|-- driver: string (nullable = true)
|-- end_lat: double (nullable = true)
|-- end_lon: double (nullable = true)
|-- fare: double (nullable = true)
|-- partitionpath: string (nullable = true)
|-- rider: string (nullable = true)
|-- ts: long (nullable = true)
|-- uuid: string (nullable = true)
scala> val df3 = df2.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType))
df3: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 9 more fields]
scala> df3.show()
+------------------+-------------------+----------+------------------+------------------+------------------+--------------------+---------+-------------+--------------------+------------------+
| begin_lat| begin_lon| driver| end_lat| end_lon| fare| partitionpath| rider| ts| uuid|_hoodie_is_deleted|
+------------------+-------------------+----------+------------------+------------------+------------------+--------------------+---------+-------------+--------------------+------------------+
|0.4726905879569653|0.46157858450465483|driver-213| 0.754803407008858|0.9671159942018241|34.158284716382845|americas/brazil/s...|rider-213|1628939270499|ce2137b9-5e30-4ac...| true|
|0.6100070562136587| 0.8779402295427752|driver-213|0.3407870505929602|0.5030798142293655| 43.4923811219014|americas/brazil/s...|rider-213|1628760621140|2421c4c1-7198-415...| true|
+------------------+-------------------+----------+------------------+------------------+------------------+--------------------+---------+-------------+--------------------+------------------+
scala> df3.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option(TABLE_NAME.key(), tableName).
| mode(Append).
| save(basePath)
// read
scala> val tripsSnapshotDF = spark.
| read.
| format("hudi").
| load(basePath + "/*/*/*/*")
tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 14 more fields]
scala> //load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
scala>
scala> spark.sql("select uuid, partitionpath, _hoodie_is_deleted from hudi_trips_snapshot").show()
+--------------------+--------------------+------------------+
| uuid| partitionpath|_hoodie_is_deleted|
+--------------------+--------------------+------------------+
|9c2328b4-d35e-446...|americas/united_s...| false|
|2622c910-2de1-4da...|americas/united_s...| false|
|3fbb2fc0-97d0-43f...|americas/united_s...| false|
|00fe5835-a676-4e6...|americas/united_s...| false|
|0f63529f-f360-4ac...|americas/united_s...| false|
|eb84bae5-24d5-4e7...|americas/brazil/s...| false|
|fdd0fd6a-7276-49f...| asia/india/chennai| false|
|5d754998-50f9-41f...| asia/india/chennai| false|
+--------------------+--------------------+------------------+
// only 8 rows returned
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment