Last active
June 19, 2020 05:05
-
-
Save tsusanto/5fbea1f137a60a27c13ea36077eabca2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by tsusanto on 2/15/2017. | |
spark-submit --master yarn-client --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar | |
spark-submit --master yarn-cluster --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar | |
*/ | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkConf | |
import org.apache.kudu.spark.kudu._ | |
import org.apache.spark.sql.functions._ | |
object sparkXMLdemo { | |
def main(args: Array[String]): Unit = { | |
val conf = new SparkConf().setAppName("spark-xml-kudu") | |
val sc = new SparkContext(conf) | |
val sqlContext = new org.apache.spark.sql.SQLContext(sc) | |
import sqlContext.implicits._ | |
val newData = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").option("samplingRatio","1").load("/user/tsusanto/POSLog-201409300635-21.xml") | |
val flattened = newData.withColumn("LineItem", explode($"RetailTransaction.LineItem")) | |
val selectedData = flattened.select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber" as "LineItemSequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.DiscountAmount", $"LineItem.Sale.ExtendedDiscountAmount", $"LineItem.Sale.Quantity") | |
val kuduContext = new KuduContext("your-kudu-master-server:7051") | |
val df = sqlContext.read.options(Map("kudu.master" -> "your-kudu-master-server:7051","kudu.table" -> "sales_lines_tenny")).kudu | |
selectedData.registerTempTable("selectedData") | |
val b = sqlContext.sql("SELECT CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string), '-', cast(LineItemSequenceNumber as string)), CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string)), RetailStoreID,WorkstationID,SequenceNumber,BusinessDayDate,OperatorName,LineItemSequenceNumber,POSIDType,cast(POSItemID as String),Description,TaxAmount,RegularSalesUnitPrice,ExtendedAmount,DiscountAmount,ExtendedDiscountAmount,Quantity FROM selectedData") | |
val c = b.selectExpr("_c0 as lineid", "_c1 as transactionid", "RetailStoreID as retailstoreid","WorkstationID as workstationid" ,"SequenceNumber as sequencenumber","BusinessDayDate as businessdaydate","OperatorName as operatorname", "LineItemSequenceNumber as lineitemsequencenumber","POSIDType as posidtype","POSItemID as positemid","Description as description","TaxAmount as taxamount","RegularSalesUnitPrice as regularsalesunitprice","ExtendedAmount as extendedamount","DiscountAmount as discountamount","ExtendedDiscountAmount as extendeddiscountamount","Quantity as quantity") | |
mykuduContext.upsertRows(c, "sales_lines_tenny") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment