Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active February 6, 2022 14:35
Show Gist options
  • Save nsivabalan/fdb8794104181f93b9268380c7f7f079 to your computer and use it in GitHub Desktop.
Save nsivabalan/fdb8794104181f93b9268380c7f7f079 to your computer and use it in GitHub Desktop.
root@adhoc-1:/opt# $SPARK_INSTALL/bin/pyspark --master local[2] --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --executor-memory 3G --num-executors 1 --packages org.apache.spark:spark-avro_2.11:2.4.4,org.apache.hudi:hudi-spark-bundle_2.11:0.10.0
Python 3.5.3 (default, Sep 27 2018, 17:25:39)
[GCC 6.3.0 20170516] on linux
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-avro_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2691e4f6-45cb-4e2a-b0e7-70adc6e80e3f;1.0
confs: [default]
found org.apache.spark#spark-avro_2.11;2.4.4 in central
found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 369ms :: artifacts dl 6ms
:: modules in use:
org.apache.spark#spark-avro_2.11;2.4.4 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 2 | 0 | 0 | 0 || 2 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-2691e4f6-45cb-4e2a-b0e7-70adc6e80e3f
confs: [default]
0 artifacts copied, 2 already retrieved (0kB/7ms)
21/08/27 12:41:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>> def gen_data(start, stop):
... return [
... {
... "uuid": str(i),
... "partitionpath": "partition",
... "array_1": [], # array does not need to be populated to reproduce issue
... "array_2": [], # two arrays need to be defined in the schema to reproduce
... "ts": str(i),
... }
... for i in range(start, stop)
... ]
...
>>> from pyspark.sql.types import StructType, StructField, StringType, ArrayType
>>> schema = StructType(
... [
... StructField("uuid", StringType(), True),
... StructField("partitionpath", StringType(), True),
... StructField("array_1", ArrayType(
... StructType(
... [
... StructField("field_1", StringType(), True),
... StructField("field_2", StringType(), True),
... ]
... )
... ), True),
... StructField("array_2", ArrayType(
... StructType(
... [
... StructField("field_3", StringType(), True),
... StructField("field_4", StringType(), True),
... ]
... )
... ), True),
... StructField("ts", StringType(), True)
... ]
... )
>>> destination = "/user/hive/warehouse/hudi_mor"
>>> hudi_write_options = {
... "hoodie.table.name": "hudi_mor",
... "hoodie.datasource.write.operation": "upsert",
... "hoodie.datasource.write.table.name": "hudi_mor",
... "hoodie.datasource.write.table.type": "MERGE_ON_READ",
... "hoodie.datasource.hive_sync.enable": True,
... "hoodie.datasource.hive_sync.jdbcurl": "jdbc:hive2://hiveserver:10000",
... "hoodie.datasource.hive_sync.database": "default",
... "hoodie.datasource.hive_sync.table": "hudi_mor",
... "hoodie.datasource.hive_sync.partition_fields": "partitionpath",
... "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor"
... }
>>> df = spark.read.json(spark.sparkContext.parallelize(gen_data(0, 100)), schema)
>>> df.write.format("hudi").options(**hudi_write_options).mode("overwrite").save(destination)
>>> df = spark.read.json(spark.sparkContext.parallelize(gen_data(50, 150)), schema)
>>> df.write.format("hudi").options(**hudi_write_options).mode("append").save(destination)
>>> spark.sql("select * from default.hudi_mor_rt").show()
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+-------+-------+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|uuid|partitionpath|array_1|array_2| ts|
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+-------+-------+---+
| 20210827125659|20210827125659_0_201| 82| partition|a7a9adf8-c504-434...| 82| partition| []| []| 82|
| 20210827125659|20210827125659_0_202| 83| partition|a7a9adf8-c504-434...| 83| partition| []| []| 83|
| 20210827125659|20210827125659_0_203| 50| partition|a7a9adf8-c504-434...| 50| partition| []| []| 50|
| 20210827125659|20210827125659_0_204| 84| partition|a7a9adf8-c504-434...| 84| partition| []| []| 84|
| 20210827125659|20210827125659_0_205| 51| partition|a7a9adf8-c504-434...| 51| partition| []| []| 51|
| 20210827125659|20210827125659_0_206| 85| partition|a7a9adf8-c504-434...| 85| partition| []| []| 85|
| 20210827125659|20210827125659_0_207| 52| partition|a7a9adf8-c504-434...| 52| partition| []| []| 52|
| 20210827125533|20210827125533_0_108| 0| partition|a7a9adf8-c504-434...| 0| partition| []| []| 0|
| 20210827125659|20210827125659_0_208| 86| partition|a7a9adf8-c504-434...| 86| partition| []| []| 86|
| 20210827125659|20210827125659_0_209| 53| partition|a7a9adf8-c504-434...| 53| partition| []| []| 53|
| 20210827125533|20210827125533_0_111| 1| partition|a7a9adf8-c504-434...| 1| partition| []| []| 1|
| 20210827125659|20210827125659_0_210| 87| partition|a7a9adf8-c504-434...| 87| partition| []| []| 87|
| 20210827125533|20210827125533_0_113| 20| partition|a7a9adf8-c504-434...| 20| partition| []| []| 20|
| 20210827125659|20210827125659_0_211| 54| partition|a7a9adf8-c504-434...| 54| partition| []| []| 54|
| 20210827125533|20210827125533_0_115| 2| partition|a7a9adf8-c504-434...| 2| partition| []| []| 2|
| 20210827125659|20210827125659_0_212| 88| partition|a7a9adf8-c504-434...| 88| partition| []| []| 88|
| 20210827125533|20210827125533_0_117| 21| partition|a7a9adf8-c504-434...| 21| partition| []| []| 21|
| 20210827125659|20210827125659_0_213| 55| partition|a7a9adf8-c504-434...| 55| partition| []| []| 55|
| 20210827125533|20210827125533_0_119| 3| partition|a7a9adf8-c504-434...| 3| partition| []| []| 3|
| 20210827125659|20210827125659_0_214| 89| partition|a7a9adf8-c504-434...| 89| partition| []| []| 89|
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+-------+-------+---+
only showing top 20 rows
root@adhoc-1:/opt# $SPARK_INSTALL/bin/pyspark --master local[2] --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --executor-memory 3G --num-executors 1 --packages org.apache.spark:spark-avro_2.11:2.4.4,org.apache.hudi:hudi-spark-bundle_2.11:0.8.0
Python 3.5.3 (default, Sep 27 2018, 17:25:39)
[GCC 6.3.0 20170516] on linux
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-avro_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2691e4f6-45cb-4e2a-b0e7-70adc6e80e3f;1.0
confs: [default]
found org.apache.spark#spark-avro_2.11;2.4.4 in central
found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 369ms :: artifacts dl 6ms
:: modules in use:
org.apache.spark#spark-avro_2.11;2.4.4 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 2 | 0 | 0 | 0 || 2 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-2691e4f6-45cb-4e2a-b0e7-70adc6e80e3f
confs: [default]
0 artifacts copied, 2 already retrieved (0kB/7ms)
21/08/27 12:41:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>> def gen_data(start, stop):
... return [
... {
... "uuid": str(i),
... "partitionpath": "partition",
... "array_1": [], # array does not need to be populated to reproduce issue
... "array_2": [], # two arrays need to be defined in the schema to reproduce
... "ts": str(i),
... }
... for i in range(start, stop)
... ]
...
>>> from pyspark.sql.types import StructType, StructField, StringType, ArrayType
>>> schema = StructType(
... [
... StructField("uuid", StringType(), True),
... StructField("partitionpath", StringType(), True),
... StructField("array_1", ArrayType(
... StructType(
... [
... StructField("field_1", StringType(), True),
... StructField("field_2", StringType(), True),
... ]
... )
... ), True),
... StructField("array_2", ArrayType(
... StructType(
... [
... StructField("field_3", StringType(), True),
... StructField("field_4", StringType(), True),
... ]
... )
... ), True),
... StructField("ts", StringType(), True)
... ]
... )
>>> destination = "/user/hive/warehouse/hudi_mor"
>>> hudi_write_options = {
... "hoodie.table.name": "hudi_mor",
... "hoodie.datasource.write.operation": "upsert",
... "hoodie.datasource.write.table.name": "hudi_mor",
... "hoodie.datasource.write.table.type": "MERGE_ON_READ",
... "hoodie.datasource.hive_sync.enable": True,
... "hoodie.datasource.hive_sync.jdbcurl": "jdbc:hive2://hiveserver:10000",
... "hoodie.datasource.hive_sync.database": "default",
... "hoodie.datasource.hive_sync.table": "hudi_mor",
... "hoodie.datasource.hive_sync.partition_fields": "partitionpath",
... "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor"
... }
>>> df = spark.read.json(spark.sparkContext.parallelize(gen_data(0, 100)), schema)
>>> df.write.format("hudi").options(**hudi_write_options).mode("overwrite").save(destination)
>>> df = spark.read.json(spark.sparkContext.parallelize(gen_data(50, 150)), schema)
>>> df.write.format("hudi").options(**hudi_write_options).mode("append").save(destination)
>>> spark.sql("select * from default.hudi_mor_rt").show()
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
22/02/06 13:59:35 WARN hadoop.ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
22/02/06 13:59:35 ERROR executor.Executor: Exception in task 0.0 in stage 124.0 (TID 51046)
org.apache.avro.SchemaParseException: Can't redefine: array
at org.apache.avro.Schema$Names.put(Schema.java:1128)
at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
at org.apache.avro.Schema.toString(Schema.java:324)
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.constructHiveOrderedSchema(AbstractRealtimeRecordReader.java:135)
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:103)
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.<init>(AbstractRealtimeRecordReader.java:67)
at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.<init>(RealtimeCompactedRecordReader.java:53)
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:70)
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.<init>(HoodieRealtimeRecordReader.java:47)
at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:123)
at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:267)
at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:224)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:95)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
22/02/06 13:59:35 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 124.0 (TID 51046, localhost, executor driver): org.apache.avro.SchemaParseException: Can't redefine: array
at org.apache.avro.Schema$Names.put(Schema.java:1128)
at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
at org.apache.avro.Schema.toString(Schema.java:324)
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.constructHiveOrderedSchema(AbstractRealtimeRecordReader.java:135)
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:103)
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.<init>(AbstractRealtimeRecordReader.java:67)
at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.<init>(RealtimeCompactedRecordReader.java:53)
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:70)
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.<init>(HoodieRealtimeRecordReader.java:47)
at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:123)
at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:267)
at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:224)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:95)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
22/02/06 13:59:35 ERROR scheduler.TaskSetManager: Task 0 in stage 124.0 failed 1 times; aborting job
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark/python/pyspark/sql/dataframe.py", line 380, in show
print(self._jdf.showString(n, 20, vertical))
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/opt/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment