Created
August 19, 2021 05:31
-
-
Save nsivabalan/e5ac69677ed05e9f2749fae67d3abc19 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| scala> import org.apache.hudi.QuickstartUtils._ | |
| import org.apache.hudi.QuickstartUtils._ | |
| scala> import scala.collection.JavaConversions._ | |
| import scala.collection.JavaConversions._ | |
| scala> import org.apache.spark.sql.SaveMode._ | |
| import org.apache.spark.sql.SaveMode._ | |
| scala> import org.apache.hudi.DataSourceReadOptions._ | |
| import org.apache.hudi.DataSourceReadOptions._ | |
| scala> import org.apache.hudi.DataSourceWriteOptions._ | |
| import org.apache.hudi.DataSourceWriteOptions._ | |
| scala> import org.apache.hudi.config.HoodieWriteConfig._ | |
| import org.apache.hudi.config.HoodieWriteConfig._ | |
| scala> | |
| scala> val tableName = "hudi_trips_cow" | |
| tableName: String = hudi_trips_cow | |
| scala> val basePath = "file:///tmp/hudi_trips_cow" | |
| basePath: String = file:///tmp/hudi_trips_cow | |
| scala> val dataGen = new DataGenerator | |
| dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@11084cb5 | |
| scala> val inserts = convertToStringList(dataGen.generateInserts(10)) | |
| inserts: java.util.List[String] = [{"ts": 1628939270499, "uuid": "ce2137b9-5e30-4ac4-a038-57326af86477", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1628760621140, "uuid": "2421c4c1-7198-415b-9221-5473383b4a73", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1629343257295, "uuid": "9c2328b4-d35e-4464-be64-fbaa4ac1623d", "rider": "rider-213", "driver"... | |
| scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
| warning: there was one deprecation warning (since 2.12.0) | |
| warning: there was one deprecation warning (since 2.2.0) | |
| warning: there were two deprecation warnings in total; for details, enable `:setting -deprecation' or `:replay -deprecation' | |
| df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] | |
| scala> import org.apache.spark.sql.types.DataTypes._ | |
| import org.apache.spark.sql.types.DataTypes._ | |
| scala> import org.apache.spark.sql.functions | |
| import org.apache.spark.sql.functions | |
| scala> val df1 = df.withColumn("_hoodie_is_deleted", lit(false).cast(BooleanType)) | |
| df1: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 9 more fields] | |
| scala> df1.printSchema | |
| root | |
| |-- begin_lat: double (nullable = true) | |
| |-- begin_lon: double (nullable = true) | |
| |-- driver: string (nullable = true) | |
| |-- end_lat: double (nullable = true) | |
| |-- end_lon: double (nullable = true) | |
| |-- fare: double (nullable = true) | |
| |-- partitionpath: string (nullable = true) | |
| |-- rider: string (nullable = true) | |
| |-- ts: long (nullable = true) | |
| |-- uuid: string (nullable = true) | |
| |-- _hoodie_is_deleted: boolean (nullable = false) | |
| scala> df1.write.format("hudi"). | |
| | options(getQuickstartWriteConfigs). | |
| | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| | option(TABLE_NAME.key(), tableName). | |
| | mode(Overwrite). | |
| | save(basePath) | |
| 21/08/19 01:23:36 WARN HoodieSparkSqlWriter$: hoodie table at file:/tmp/hudi_trips_cow already exists. Deleting existing data & overwriting with new data. | |
| //read | |
| scala> val tripsSnapshotDF = spark. | |
| | read. | |
| | format("hudi"). | |
| | load(basePath + "/*/*/*/*") | |
| tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 14 more fields] | |
| scala> //load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery | |
| scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
| scala> spark.sql("select uuid, partitionpath, _hoodie_is_deleted from hudi_trips_snapshot").show() | |
| +--------------------+--------------------+------------------+ | |
| | uuid| partitionpath|_hoodie_is_deleted| | |
| +--------------------+--------------------+------------------+ | |
| |9c2328b4-d35e-446...|americas/united_s...| false| | |
| |2622c910-2de1-4da...|americas/united_s...| false| | |
| |3fbb2fc0-97d0-43f...|americas/united_s...| false| | |
| |00fe5835-a676-4e6...|americas/united_s...| false| | |
| |0f63529f-f360-4ac...|americas/united_s...| false| | |
| |ce2137b9-5e30-4ac...|americas/brazil/s...| false| | |
| |2421c4c1-7198-415...|americas/brazil/s...| false| | |
| |eb84bae5-24d5-4e7...|americas/brazil/s...| false| | |
| |fdd0fd6a-7276-49f...| asia/india/chennai| false| | |
| |5d754998-50f9-41f...| asia/india/chennai| false| | |
| +--------------------+--------------------+------------------+ | |
| // Issue deletes for first 2 records | |
| scala> val df2 = df.limit(2) | |
| df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [begin_lat: double, begin_lon: double ... 8 more fields] | |
| scala> df2.printSchema | |
| root | |
| |-- begin_lat: double (nullable = true) | |
| |-- begin_lon: double (nullable = true) | |
| |-- driver: string (nullable = true) | |
| |-- end_lat: double (nullable = true) | |
| |-- end_lon: double (nullable = true) | |
| |-- fare: double (nullable = true) | |
| |-- partitionpath: string (nullable = true) | |
| |-- rider: string (nullable = true) | |
| |-- ts: long (nullable = true) | |
| |-- uuid: string (nullable = true) | |
| scala> val df3 = df2.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType)) | |
| df3: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 9 more fields] | |
| scala> df3.show() | |
| +------------------+-------------------+----------+------------------+------------------+------------------+--------------------+---------+-------------+--------------------+------------------+ | |
| | begin_lat| begin_lon| driver| end_lat| end_lon| fare| partitionpath| rider| ts| uuid|_hoodie_is_deleted| | |
| +------------------+-------------------+----------+------------------+------------------+------------------+--------------------+---------+-------------+--------------------+------------------+ | |
| |0.4726905879569653|0.46157858450465483|driver-213| 0.754803407008858|0.9671159942018241|34.158284716382845|americas/brazil/s...|rider-213|1628939270499|ce2137b9-5e30-4ac...| true| | |
| |0.6100070562136587| 0.8779402295427752|driver-213|0.3407870505929602|0.5030798142293655| 43.4923811219014|americas/brazil/s...|rider-213|1628760621140|2421c4c1-7198-415...| true| | |
| +------------------+-------------------+----------+------------------+------------------+------------------+--------------------+---------+-------------+--------------------+------------------+ | |
| scala> df3.write.format("hudi"). | |
| | options(getQuickstartWriteConfigs). | |
| | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| | option(TABLE_NAME.key(), tableName). | |
| | mode(Append). | |
| | save(basePath) | |
| // read | |
| scala> val tripsSnapshotDF = spark. | |
| | read. | |
| | format("hudi"). | |
| | load(basePath + "/*/*/*/*") | |
| tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 14 more fields] | |
| scala> //load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery | |
| scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
| scala> | |
| scala> spark.sql("select uuid, partitionpath, _hoodie_is_deleted from hudi_trips_snapshot").show() | |
| +--------------------+--------------------+------------------+ | |
| | uuid| partitionpath|_hoodie_is_deleted| | |
| +--------------------+--------------------+------------------+ | |
| |9c2328b4-d35e-446...|americas/united_s...| false| | |
| |2622c910-2de1-4da...|americas/united_s...| false| | |
| |3fbb2fc0-97d0-43f...|americas/united_s...| false| | |
| |00fe5835-a676-4e6...|americas/united_s...| false| | |
| |0f63529f-f360-4ac...|americas/united_s...| false| | |
| |eb84bae5-24d5-4e7...|americas/brazil/s...| false| | |
| |fdd0fd6a-7276-49f...| asia/india/chennai| false| | |
| |5d754998-50f9-41f...| asia/india/chennai| false| | |
| +--------------------+--------------------+------------------+ | |
| // only 8 rows returned |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment