Last active
September 1, 2021 15:25
-
-
Save nsivabalan/5f140b947d76939d577ce43c16edc7e4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val df = spark.read.format("parquet").load("/tmp/bootstrap_src").limit(1000) | |
val df1 = df.select(col("*"), (substring(col("created_at"), 0, 10)).as("date_col")).drop("created_at") | |
df1.write.parquet("/tmp/bootstrap_src_parquet/") | |
// Prepare bootstrap base path with 1000 records | |
val parquetDf = spark.read.format("parquet").load("/tmp/bootstrap_src_parquet/") | |
scala> parquetDf.printSchema | |
root | |
|-- type: string (nullable = true) | |
|-- public: boolean (nullable = true) | |
|-- payload: string (nullable = true) | |
|-- repo: struct (nullable = true) | |
| |-- id: long (nullable = true) | |
| |-- name: string (nullable = true) | |
| |-- url: string (nullable = true) | |
|-- actor: struct (nullable = true) | |
| |-- id: long (nullable = true) | |
| |-- login: string (nullable = true) | |
| |-- gravatar_id: string (nullable = true) | |
| |-- avatar_url: string (nullable = true) | |
| |-- url: string (nullable = true) | |
|-- org: struct (nullable = true) | |
| |-- id: long (nullable = true) | |
| |-- login: string (nullable = true) | |
| |-- gravatar_id: string (nullable = true) | |
| |-- avatar_url: string (nullable = true) | |
| |-- url: string (nullable = true) | |
|-- id: string (nullable = true) | |
|-- other: string (nullable = true) | |
|-- date_col: string (nullable = true) | |
// /tmp/bootstrap_src_parquet/ has parquet files in root level. there are no sub directories. | |
import org.apache.hudi.{DataSourceWriteOptions, HoodieDataSourceHelpers} | |
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} | |
import org.apache.hudi.keygen.SimpleKeyGenerator | |
import org.apache.spark.sql.SaveMode | |
val basePath = "/tmp/bootstrap_test" | |
val bootstrapDF = spark.emptyDataFrame | |
bootstrapDF.write. | |
format("hudi"). | |
option(HoodieWriteConfig.TABLE_NAME, "bootstrap_test"). | |
option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL). | |
option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id"). | |
option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "type"). | |
option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, "/tmp/bootstrap_src_parquet/"). | |
option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName). | |
mode(SaveMode.Overwrite). | |
save(basePath) | |
var hudiDf = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*") | |
hudiDf.registerTempTable("hudi_tbl") | |
scala> spark.sql("select count(distinct id) from hudi_tbl").show() | |
21/09/01 10:50:36 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException | |
+------------------+ | |
|count(DISTINCT id)| | |
+------------------+ | |
| 1000| | |
+------------------+ | |
scala> | |
scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, _hoodie_file_name, type, public, id, date_col from hudi_tbl limit 5").show() | |
+-------------------+------------------+----------------------+--------------------+----------------+------+-----------+----------+ | |
|_hoodie_commit_time|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| type|public| id| date_col| | |
+-------------------+------------------+----------------------+--------------------+----------------+------+-----------+----------+ | |
| 00000000000001| 12922545683| |792ed9b7-06d8-461...| PushEvent| true|12922545683|2020-07-16| | |
| 00000000000001| 12890291754| |792ed9b7-06d8-461...| PushEvent| true|12890291754|2020-07-13| | |
| 00000000000001| 13087770934| |792ed9b7-06d8-461...| PushEvent| true|13087770934|2020-08-02| | |
| 00000000000001| 13085421025| |792ed9b7-06d8-461...|PullRequestEvent| true|13085421025|2020-08-01| | |
| 00000000000001| 12946683709| |792ed9b7-06d8-461...| CreateEvent| true|12946683709|2020-07-18| | |
+-------------------+------------------+----------------------+--------------------+----------------+------+-----------+----------+ | |
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
// lets take 1500 records and update one col (date_col) and do upsert to hudi table | |
val df = spark.read.format("parquet").load("/tmp/bootstrap_src").limit(1500) | |
val df1 = df.select(col("*"), (substring(col("created_at"), 0, 10)).cast("date").as("date_col1")) | |
val df2 = df1.withColumn("date_col", expr("date_add(date_col1,10)")).drop("created_at").drop("date_col1") | |
scala> hudiDf = spark. | |
| read. | |
| format("hudi"). | |
| load(basePath + "/*") | |
21/09/01 10:51:34 WARN DefaultSource: Loading Base File Only View. | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (ForkEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (PublicEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (ReleaseEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (CreateEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (CommitCommentEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (PullRequestEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (DeleteEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (IssuesEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (PushEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (IssueCommentEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (PullRequestReviewCommentEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (GollumEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (WatchEvent) | |
21/09/01 10:51:34 WARN HFileBootstrapIndex: No value found for partition key (MemberEvent) | |
hudiDf: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 12 more fields] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment