Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created December 16, 2021 16:35
Show Gist options
  • Select an option

  • Save nsivabalan/b3e9ca73746337e9404d65ffe18bd876 to your computer and use it in GitHub Desktop.

Select an option

Save nsivabalan/b3e9ca73746337e9404d65ffe18bd876 to your computer and use it in GitHub Desktop.
run docker set up.
root@adhoc-1:/opt# cp hadoop-2.8.4/etc/hadoop/hive-site.xml spark/conf/
root@adhoc-1:/opt# cp hadoop-2.8.4/etc/hadoop/core-site.xml spark/conf/
root@adhoc-1:/opt# cp hadoop-2.8.4/etc/hadoop/hdfs-site.xml spark/conf/
$SPARK_INSTALL/bin/spark-shell --jars $HUDI_SPARK_BUNDLE --master local[2] --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.catalogImplementation=hive --deploy-mode client --driver-memory 1G --executor-memory 3G --num-executors 1 --packages org.apache.spark:spark-avro_2.11:2.4.4
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_trips_cow"
val basePath = "/tmp/hudi_trips_cow"
val schema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true)
))
val data0 = Seq(Row("row_1", "2021/01/01",0L,"bob","v_0","toBeDel0",0,1000000L),
Row("row_2", "2021/01/01",0L,"john","v_0","toBeDel0",0,1000000L),
Row("row_3", "2021/01/01",0L,"tom","v_0","toBeDel0",0,1000000L))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.datasource.write.hive_style_partitioning","true").
option(TABLE_NAME, tableName).
option("hoodie.index.type","SIMPLE").
option(OPERATION_OPT_KEY, "insert").
option("hoodie.datasource.hive_sync.jdbcurl","jdbc:hive2://hiveserver:10000/").
option("hoodie.datasource.hive_sync.database","testdb").
option("hoodie.datasource.hive_sync.table","testtable").
option("hoodie.datasource.hive_sync.partition_fields","partitionId").
option("hoodie.datasource.hive_sync.enable","true").
option("hoodie.datasource.write.drop.partition.columns","true").
mode(Overwrite).
save(basePath)
// from beeline
docker exec -it adhoc-2 /bin/bash
beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false
use testdb;
show tables;
show partitions testtable;
+-------------------------+--+
| partition |
+-------------------------+--+
| partitionid=2021-01-01 |
+-------------------------+--+
select rowId, partitionId, preComb, name from testtable limit 4;
+--------+--------------+----------+-------+--+
| rowid | partitionid | precomb | name |
+--------+--------------+----------+-------+--+
| row_1 | 2021-01-01 | 0 | bob |
| row_2 | 2021-01-01 | 0 | john |
| row_3 | 2021-01-01 | 0 | tom |
+--------+--------------+----------+-------+--+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment