Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active September 1, 2021 15:25
Show Gist options
  • Save nsivabalan/5f140b947d76939d577ce43c16edc7e4 to your computer and use it in GitHub Desktop.
Save nsivabalan/5f140b947d76939d577ce43c16edc7e4 to your computer and use it in GitHub Desktop.
val df = spark.read.format("parquet").load("/tmp/bootstrap_src").limit(1000)
val df1 = df.select(col("*"), (substring(col("created_at"), 0, 10)).as("date_col")).drop("created_at")
df1.write.parquet("/tmp/bootstrap_src_parquet/")
// Prepare bootstrap base path with 1000 records
val parquetDf = spark.read.format("parquet").load("/tmp/bootstrap_src_parquet/")
scala> parquetDf.printSchema
root
|-- type: string (nullable = true)
|-- public: boolean (nullable = true)
|-- payload: string (nullable = true)
|-- repo: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
| |-- url: string (nullable = true)
|-- actor: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- login: string (nullable = true)
| |-- gravatar_id: string (nullable = true)
| |-- avatar_url: string (nullable = true)
| |-- url: string (nullable = true)
|-- org: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- login: string (nullable = true)
| |-- gravatar_id: string (nullable = true)
| |-- avatar_url: string (nullable = true)
| |-- url: string (nullable = true)
|-- id: string (nullable = true)
|-- other: string (nullable = true)
|-- date_col: string (nullable = true)
// /tmp/bootstrap_src_parquet/ has parquet files in root level. there are no sub directories.
import org.apache.hudi.{DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.spark.sql.SaveMode
val basePath = "/tmp/bootstrap_test"
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write.
format("hudi").
option(HoodieWriteConfig.TABLE_NAME, "bootstrap_test").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL).
option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id").
option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "type").
option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, "/tmp/bootstrap_src_parquet/").
option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName).
mode(SaveMode.Overwrite).
save(basePath)
var hudiDf = spark.
read.
format("hudi").
load(basePath + "/*")
hudiDf.registerTempTable("hudi_tbl")
scala> spark.sql("select count(distinct id) from hudi_tbl").show()
21/09/01 10:50:36 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+------------------+
|count(DISTINCT id)|
+------------------+
| 1000|
+------------------+
scala>
scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, _hoodie_file_name, type, public, id, date_col from hudi_tbl limit 5").show()
+-------------------+------------------+----------------------+--------------------+----------------+------+-----------+----------+
|_hoodie_commit_time|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| type|public| id| date_col|
+-------------------+------------------+----------------------+--------------------+----------------+------+-----------+----------+
| 00000000000001| 12922545683| |792ed9b7-06d8-461...| PushEvent| true|12922545683|2020-07-16|
| 00000000000001| 12890291754| |792ed9b7-06d8-461...| PushEvent| true|12890291754|2020-07-13|
| 00000000000001| 13087770934| |792ed9b7-06d8-461...| PushEvent| true|13087770934|2020-08-02|
| 00000000000001| 13085421025| |792ed9b7-06d8-461...|PullRequestEvent| true|13085421025|2020-08-01|
| 00000000000001| 12946683709| |792ed9b7-06d8-461...| CreateEvent| true|12946683709|2020-07-18|
+-------------------+------------------+----------------------+--------------------+----------------+------+-----------+----------+
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
// lets take 1500 records and update one col (date_col) and do upsert to hudi table
val df = spark.read.format("parquet").load("/tmp/bootstrap_src").limit(1500)
val df1 = df.select(col("*"), (substring(col("created_at"), 0, 10)).cast("date").as("date_col1"))
val df2 = df1.withColumn("date_col", expr("date_add(date_col1,10)")).drop("created_at").drop("date_col1")
scala> hudiDf = spark.
| read.
| format("hudi").
| load(basePath + "/*")
21/09/01 10:51:34 WARN DefaultSource: Loading Base File Only View.
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (ForkEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (PublicEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (ReleaseEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (CreateEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (CommitCommentEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (PullRequestEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (DeleteEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (IssuesEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (PushEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (IssueCommentEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (PullRequestReviewCommentEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (GollumEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (WatchEvent)
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (MemberEvent)
hudiDf: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 12 more fields]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment