Skip to content

Instantly share code, notes, and snippets.

@mehdip2007
Created January 21, 2023 17:36
Show Gist options
  • Save mehdip2007/f6abef968843f4864b49375133430b20 to your computer and use it in GitHub Desktop.
Save mehdip2007/f6abef968843f4864b49375133430b20 to your computer and use it in GitHub Desktop.
Migration from Oracle to Hive issue
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row, HiveContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
import multiprocessing as mp
conf = SparkConf().setAll([('spark.executor.cores', '4'),
('spark.cores.max', '8'),
("spark.yarn.am.memory", "8g"),
("spark.driver.memoryOverhead", "8g"),
("spark.executor.instances", "10"),
('spark.dynamicAllocation.enabled', 'true'),
("spark.shuffle.service.enabled", "true"),
("spark.dynamicAllocation.maxExecutors", 10),
("spark.dynamicAllocation.minExecutors", 10),
("spark.rapids.sql.enabled", 'true'),
("spark.rapids.sql.decimalType.enabled", 'true'),
("spark.driver.memory", "5g"),
("spark.executor.memory", "6g"),
("spark.kyroserializer.buffer.max", "512mb")])
sc = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Move Data from Oracle to Hive") \
.enableHiveSupport() \
.getOrCreate()
bounds = "(select /*+parallel(128) */ MIN(sbrp_id) as min, MAX(sbrp_id) as max FROM temp_schema.z_rabani_CIP_REV_TOT_mnth) bounds"
db_driver = 'oracle.jdbc.driver.OracleDriver'
db_url = 'jdbc:oracle:thin:user/pass@IP:1521/SID'
conn_properties = {'user': 'username', 'password': 'pass', 'driver': 'oracle.jdbc.driver.OracleDriver'}
partitions = mp.cpu_count() * 10
bounds_result = spark.read.jdbc(url=db_url, table=bounds, properties=conn_properties).collect()[0]
bounds_result
Row(MIN=Decimal('11000049258014.0000000000'), MAX=Decimal('9911000203959074.0000000000'))
# to get the types from oracle
t_types_query = "(select COLUMN_NAME, DATA_LENGTH, data_type from all_tab_cols where OWNER = 'TEMP_SCHEMA' and TABLE_NAME = 'Z_RABANI_CIP_REV_TOT_MNTH')"
t_types
DataFrame[COLUMN_NAME: string, DATA_LENGTH: decimal(38,10), DATA_TYPE: string]
t_types.show(t_types.count(), truncate=False)
+------------------------------+--------------+---------+
|COLUMN_NAME |DATA_LENGTH |DATA_TYPE|
+------------------------------+--------------+---------+
|MONTH_KEY |22.0000000000 |NUMBER |
|SBRP_ID |22.0000000000 |NUMBER |
|ACCS_MTHD_ID |22.0000000000 |NUMBER |
|ENCRYPTED_ACCS_MTHD_ID |172.0000000000|VARCHAR2 |
|SBRP_STAT_ID |22.0000000000 |NUMBER |
|SBRP_TYP_ID |22.0000000000 |NUMBER |
|BRTH_DT |22.0000000000 |NUMBER |
|ACTVN_DT |22.0000000000 |NUMBER |
|GNDR_ID |22.0000000000 |NUMBER |
|USIM_FLAG |22.0000000000 |NUMBER |
|RGTRN_PROVNC_ID |22.0000000000 |NUMBER |
|RGTRN_CITY_ID |22.0000000000 |NUMBER |
|LST_HNDST_ID |22.0000000000 |NUMBER |
|LST_HNDST_MODL_ID |32.0000000000 |VARCHAR2 |
|EXMPTION_FEE |22.0000000000 |NUMBER |
|DATA_REV |22.0000000000 |NUMBER |
|DATA_PAGY_REV |22.0000000000 |NUMBER |
|DATA_PKG_REV |22.0000000000 |NUMBER |
|SMS_REV |22.0000000000 |NUMBER |
|VOICE_REV |22.0000000000 |NUMBER |
|VAS_REV |22.0000000000 |NUMBER |
|PKG_REV |22.0000000000 |NUMBER |
|ROAM_REV |22.0000000000 |NUMBER |
|INTL_REV |22.0000000000 |NUMBER |
|ABONMAN |22.0000000000 |NUMBER |
|CRM_SVC_REV |22.0000000000 |NUMBER |
|BAL_TRNSFR_AMT |22.0000000000 |NUMBER |
|CIP_REV |22.0000000000 |NUMBER |
|CIP_AVG_REV_12MONTH |22.0000000000 |NUMBER |
|AVG_TOT_12 |22.0000000000 |NUMBER |
|TOT_REV |22.0000000000 |NUMBER |
|ACCOM_DT |22.0000000000 |NUMBER |
|CNTN_MNTH_REV_CNT |22.0000000000 |NUMBER |
|CIP_REV_FLG |22.0000000000 |NUMBER |
|CIP_TOT_FLG |22.0000000000 |NUMBER |
|LOAD_DT |7.0000000000 |DATE |
|CIP_LVL_REV_ID |22.0000000000 |NUMBER |
|CIP_LVL_TOT_ID |22.0000000000 |NUMBER |
|TOP1MOST_USED_CELL |22.0000000000 |NUMBER |
|TOP1MOST_USED_CELL_BY_CL_DUR |22.0000000000 |NUMBER |
|TOP1MOST_USED_CELL_BY_DATA_VOL|22.0000000000 |NUMBER |
|DATA_USG_PAYG_VOL |22.0000000000 |NUMBER |
|PKG_DATA_USG_VOL |22.0000000000 |NUMBER |
|VOI_PAYG_DUR |22.0000000000 |NUMBER |
|VOI_PKG_DUR |22.0000000000 |NUMBER |
|SMS_PKG_CNT |22.0000000000 |NUMBER |
|SMS_PAYG_CNT |22.0000000000 |NUMBER |
+------------------------------+--------------+---------+
q = "(select * from temp_schema.z_rabani_cip_rev_tot_mnth)"
df = spark.read.jdbc(url=db_url, table=q \
,numPartitions=partitions, \
column='sbrp_id', \
lowerBound=bounds_result.MIN, \
upperBound= bounds_result.MAX + 1, \
properties=conn_properties)
# also tried different casting but it wont work
df2 = df.withColumn("MONTH_KEY", col("MONTH_KEY").cast(StringType())) \
.withColumn("SBRP_ID", col("SBRP_ID").cast(StringType())) \
.withColumn("ACCS_MTHD_ID", col("ACCS_MTHD_ID").cast(StringType())) \
.withColumn("ENCRYPTED_ACCS_MTHD_ID", col("ENCRYPTED_ACCS_MTHD_ID").cast(StringType())) \
.withColumn("SBRP_STAT_ID", col("SBRP_STAT_ID").cast(StringType())) \
.withColumn("SBRP_TYP_ID", col("SBRP_TYP_ID").cast(StringType())) \
.withColumn("BRTH_DT", col("BRTH_DT").cast(StringType())) \
.withColumn("ACTVN_DT", col("ACTVN_DT").cast(StringType())) \
.withColumn("GNDR_ID", col("GNDR_ID").cast(StringType())) \
.withColumn("USIM_FLAG", col("USIM_FLAG").cast(StringType())) \
.withColumn("RGTRN_PROVNC_ID", col("RGTRN_PROVNC_ID").cast(StringType())) \
.withColumn("RGTRN_CITY_ID", col("RGTRN_CITY_ID").cast(StringType())) \
.withColumn("LST_HNDST_ID", col("LST_HNDST_ID").cast(StringType())) \
.withColumn("LST_HNDST_MODL_ID", col("LST_HNDST_MODL_ID").cast(StringType())) \
.withColumn("EXMPTION_FEE", col("EXMPTION_FEE").cast(StringType())) \
.withColumn("DATA_REV", col("DATA_REV").cast(StringType())) \
.withColumn("DATA_PAGY_REV", col("DATA_PAGY_REV").cast(StringType())) \
.withColumn("DATA_PKG_REV", col("DATA_PKG_REV").cast(StringType())) \
.withColumn("SMS_REV", col("SMS_REV").cast(StringType())) \
.withColumn("VOICE_REV", col("VOICE_REV").cast(StringType())) \
.withColumn("VAS_REV", col("VAS_REV").cast(StringType())) \
.withColumn("PKG_REV", col("PKG_REV").cast(StringType())) \
.withColumn("ROAM_REV", col("ROAM_REV").cast(StringType())) \
.withColumn("INTL_REV", col("INTL_REV").cast(StringType())) \
.withColumn("ABONMAN", col("ABONMAN").cast(StringType())) \
.withColumn("CRM_SVC_REV", col("CRM_SVC_REV").cast(StringType())) \
.withColumn("BAL_TRNSFR_AMT", col("BAL_TRNSFR_AMT").cast(StringType())) \
.withColumn("CIP_REV", col("CIP_REV").cast(StringType())) \
.withColumn("CIP_AVG_REV_12MONTH", col("CIP_AVG_REV_12MONTH").cast(StringType())) \
.withColumn("AVG_TOT_12", col("AVG_TOT_12").cast(StringType())) \
.withColumn("TOT_REV", col("TOT_REV").cast(StringType())) \
.withColumn("ACCOM_DT", col("ACCOM_DT").cast(StringType())) \
.withColumn("CNTN_MNTH_REV_CNT", col("CNTN_MNTH_REV_CNT").cast(StringType())) \
.withColumn("CIP_REV_FLG", col("CIP_REV_FLG").cast(StringType())) \
.withColumn("CIP_TOT_FLG", col("CIP_TOT_FLG").cast(StringType())) \
.withColumn("LOAD_DT", col("LOAD_DT").cast(StringType())) \
.withColumn("CIP_LVL_REV_ID", col("CIP_LVL_REV_ID").cast(StringType())) \
.withColumn("CIP_LVL_TOT_ID", col("CIP_LVL_TOT_ID").cast(StringType())) \
.withColumn("TOP1MOST_USED_CELL", col("TOP1MOST_USED_CELL").cast(StringType())) \
.withColumn("TOP1MOST_USED_CELL_BY_CL_DUR", col("TOP1MOST_USED_CELL_BY_CL_DUR").cast(StringType())) \
.withColumn("TOP1MOST_USED_CELL_BY_DATA_VOL", col("TOP1MOST_USED_CELL_BY_DATA_VOL").cast(StringType())) \
.withColumn("DATA_USG_PAYG_VOL", col("DATA_USG_PAYG_VOL").cast(StringType())) \
.withColumn("PKG_DATA_USG_VOL", col("PKG_DATA_USG_VOL").cast(StringType())) \
.withColumn("VOI_PAYG_DUR", col("VOI_PAYG_DUR").cast(StringType())) \
.withColumn("VOI_PKG_DUR", col("VOI_PKG_DUR").cast(StringType())) \
.withColumn("SMS_PKG_CNT", col("SMS_PKG_CNT").cast(StringType())) \
.withColumn("SMS_PAYG_CNT", col("SMS_PAYG_CNT").cast(StringType()))
### this is the error regarding precision
[Stage 0:> (0 + 1) / 1]23/01/18 13:22:51 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
23/01/18 13:22:51 ERROR datasources.FileFormatWriter: Aborting job e55cebc6-f1e0-438b-a899-ce2c40ba5d3c.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, dhub-dnod49.datahub.m cci.local, executor 12): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: requirement failed: Decimal precision 57 exceeds max precision 38
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.sql.types.Decimal.set(Decimal.scala:114)
at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:453)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$3$$anonfun$12.apply(JdbcUtils.sc ala:407)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$3$$anonfun$12.apply(JdbcUtils.sc ala:407)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$nullSafeConvert(JdbcUtils.scala:509)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$3.apply(JdbcUtils.scala:407)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$3.apply(JdbcUtils.scala:405)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:356)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:338)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.sc ala:244)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.sc ala:242)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1442)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
... 10 more
MONTH_KEY SBRP_ID ACCS_MTHD_ID ENCRYPTED_ACCS_MTHD_ID SBRP_STAT_ID SBRP_TYP_ID BRTH_DT ACTVN_DT GNDR_ID USIM_FLAG RGTRN_PROVNC_ID RGTRN_CITY_ID LST_HNDST_ID LST_HNDST_MODL_ID EXMPTION_FEE DATA_REV DATA_PAGY_REV DATA_PKG_REV SMS_REV VOICE_REV VAS_REV PKG_REV ROAM_REV INTL_REV ABONMAN CRM_SVC_REV BAL_TRNSFR_AMT CIP_REV CIP_AVG_REV_12MONTH AVG_TOT_12 TOT_REV ACCOM_DT CNTN_MNTH_REV_CNT CIP_REV_FLG CIP_TOT_FLG LOAD_DT CIP_LVL_REV_ID CIP_LVL_TOT_ID TOP1MOST_USED_CELL TOP1MOST_USED_CELL_BY_CL_DUR TOP1MOST_USED_CELL_BY_DATA_VOL DATA_USG_PAYG_VOL PKG_DATA_USG_VOL VOI_PAYG_DUR VOI_PKG_DUR SMS_PKG_CNT SMS_PAYG_CNT
140103 1510021051081920 989149710985 541909179958998 2 0 13650101 13860229 1 1 1 2111132 35284205932150 35284205 0 0 0 0 46758 241690.773 0 0 0 0 0 0 0 289448.773 169081.603 169681.603 289448.773 13860229 0 0 17-JAN-23 98281395476918535076221742106713565469100000000 98281395476918535076221742106713565469100000000 -1 0 0 16131 0 0 338
140103 1010021053494439 989146573139 941136751693998 2 0 13651106 13931022 1 1 1 2111100 35746963002079 35746963 0 16.2 16.2 0 0 70495.015 0 0 0 0 0 0 0 70511.215 60147.041 60147.041 70511.215 13931022 0 0 17-JAN-23 1184586971584167125259814452265400345360000000000 82980434746157440002900480795601788726900000000 1184586971584167125259814452265400345360000000000 18371 157208869 4705 0 0 0
140103 1710021103187531 989140172908 841920719080998 2 0 13570101 13931126 1 0 4 2141100 35325011659932 35325011 0 282000 0 282000 16886 205087.304 0 282000 0 0 0 0 0 504973.304 294914.21 294914.21 504973.304 13931126 0 0 17-JAN-23 66124586552983392236551162434830086474200000000 268845805509177459081022021734135459649000000000 66124586552983392236551162434830086474200000000 0 3896582174 13688 0 0 123
140103 1411000081908560 989169111148 861119111984998 2 0 13420701 13980326 1 1 19 1191100 35407610432289 35407610 0 846000 0 846000 0 3655.852 0 846000 0 0 0 0 0 849655.852 592761.034 592761.034 849655.852 13980326 0 0 17-JAN-23 1289215394901486316991867870795846208320000000000 289701218067417657152459379769039162486600000000 1289215394901486316991867870795846208320000000000 0 19275141349 244 0 0 0
140103 1210021064777841 989145134953 341945319535998 2 0 13760323 13960515 1 1 2 2121100 35472311866081 35472311 0 23918.1 23918.1 0 0 1123.725 0 0 0 0 0 0 0 25041.825 54771.871 54771.871 25041.825 13960515 0 0 17-JAN-23 1341543272272607541856909468986097740240000000000 1006234079762903379575936297918584633060000000000 1006234079762903379575936297918584633060000000000 40803246 0 75 0 0 0
140103 1211000080554969 989923778349 929383773394998 2 0 13590628 13980231 1 1 5 2151100 35645909609959 35645909 0 0 0 0 1224 328951.765 0 0 0 0 0 0 0 330175.765 145822.778 145822.778 330175.765 13980231 0 0 17-JAN-23 88698956120946734024330957932387500110000000000 88698956120946734024330957932387500110000000000 -1 0 0 21955 0 0 9
140103 311000183987212 989148864193 341148681839998 2 0 13521218 13881217 0 1 1 2111100 35472198298969 35472198 0 25083.6 25083.6 0 0 44934.017 0 0 0 0 0 0 0 70017.617 66433.585 66433.585 70017.617 14010219 0 0 17-JAN-23 910555503680251414399232583200480211604200000000 910555503680251414399232583200480211604200000000 614316815542487142013779069585079541756900000000 56018586 0 2999 0 0 0
140103 711000185226755 989960745402 269450474020998 2 0 13440317 14010303 0 1 1 -1 35083956379959 35083956 0 0 0 0 0 4584.798 0 0 0 0 0 0 0 4584.798 4584.798 4584.798 4584.798 0 0 17-JAN-23 102658252060230497029641183568585653786700000000 422184699477361644747936918117245313034700000000 1408705410116084950276515797544056657680000000000 0 1178013 306 66 0 0
140103 1011000128677158 989100883425 501430884052998 2 0 13810128 13960228 1 1 27 1271100 86635105128291 86635105 0 1128000 0 1128000 1018 24527.171 0 1128000 0 0 0 0 0 1153545.171 489338.688 493172.022 1153545.171 13990801 0 0 17-JAN-23 1293531122515722365141018130371217097830000000000 113873078251219756253450860976138965948700000000 1293531122515722365141018130371217097830000000000 0 25974481363 1637 0 0 4
140103 111000175386377 989129329195 521199231959998 2 1 13760911 13921226 0 1 4380 2101100 35914477856063 35914477 0 1410000 0 1410000 41883 382428.764 0 1410000 0 40000 6300 0 0 1794311.764 1471409.357 1535583.357 1840611.764 14001102 0 0 17-JAN-23 1117207247566366982926816033136702775845000000 1244655532467324184700450990109586206580000000000 752958638024218553524714731276034530879900000000 0 35527847101 38308 0 0 8
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment