Last active
June 19, 2019 11:54
-
-
Save rampage644/f0fdb964b7f437beba62 to your computer and use it in GitHub Desktop.
spark etl sample, attempt #1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.text.SimpleDateFormat | |
import java.util.Date | |
import org.apache.spark.{SparkContext, SparkConf} | |
import org.apache.spark.sql.{SaveMode, Row, SQLContext} | |
import com.databricks.spark.csv.CsvSchemaRDD | |
import org.apache.spark.sql.functions._ | |
object JDBCTester { | |
def main(args: Array[String]) { | |
val conf = new SparkConf().setAppName("JDBC Tester") | |
val sc = new SparkContext(conf) | |
val sqlCtx = new SQLContext(sc) | |
import sqlCtx.implicits._ | |
val volumes = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/volumes.data", "header" -> "false")).toDF("CREATED_AT", | |
"UPDATED_AT", | |
"DELETED_AT", | |
"DELETED", | |
"ID", | |
"EC2_ID", | |
"USER_ID", | |
"PROJECT_ID", | |
"HOST", | |
"SIZE", | |
"AVAILABILITY_ZONE", | |
"INSTANCE_UUID", | |
"MOUNTPOINT", | |
"ATTACH_TIME", | |
"STATUS", | |
"ATTACH_STATUS", | |
"SCHEDULED_AT", | |
"LAUNCHED_AT", | |
"TERMINATED_AT", | |
"DISPLAY_NAME", | |
"DISPLAY_DESCRIPTION", | |
"PROVIDER_LOCATION", | |
"PROVIDER_AUTH", | |
"SNAPSHOT_ID", | |
"VOLUME_TYPE_ID", | |
"SOURCE_SYSTEM_NAME", | |
"SOURCE_VOLID", | |
"bootable", | |
"attached_host", | |
"provider_geometry", | |
"_name_id", | |
"encryption_key_id", | |
"migration_status").as('V) | |
val volume_type = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/volume_types.data", "header" -> "false")).toDF( | |
"CREATED_AT","UPDATED_AT","DELETED_AT","DELETED","ID","NAME","SOURCE_SYSTEM_NAME", "DC").as('VT) | |
val description_ = udf((source: String, name:String) => s"$name in $source datacenter") | |
val instanceType = udf( () => "cbs") | |
val keyGenerator = udf( () => 0) | |
val getdate = udf( () => new SimpleDateFormat("dd/mm/yy").format(new java.util.Date())) | |
val instance = udf( () => "m_Dim_Instance") | |
val source_system = udf( () => "cbs_instance") | |
val current_record = udf( () => 1) | |
val extended_table = udf( () => "Dim_Instance_Extended_CBS") | |
volumes.join(volume_type, $"VT.ID" === $"V.VOLUME_TYPE_ID", "inner") | |
.select( | |
keyGenerator() as 'Instance_Key, | |
$"V.ID" as 'Instance_NK, | |
instanceType() as 'Instance_Type, | |
$"V.ID" as 'Assigned_Instance_Number, | |
'PROJECT_ID as 'Assigned_Account_Number, | |
'DISPLAY_NAME as 'Instance_Name, | |
description_($"V.SOURCE_SYSTEM_NAME", 'NAME) as 'Instance_Description, | |
'STATUS as 'Instance_Status, | |
$"V.SOURCE_SYSTEM_NAME" as 'Instance_Datacenter, | |
getdate() as 'Rec_Created_Date, | |
getdate() as 'Rec_Updated_Date, | |
$"V.CREATED_AT" as 'Instance_Creation_Date, | |
$"V.UPDATED_AT" as 'Instance_Update_Date, | |
instance() as 'Instance_Created_by, | |
instance() as 'Instance_Updated_by, | |
$"V.CREATED_AT" as 'Effective_Start_Datetime, | |
$"V.DELETED_AT" as 'Effective_End_Datetime, | |
source_system() as 'Source_System_Name, | |
current_record() as 'Current_Record, | |
extended_table() as 'Extended_Table_Name | |
).save("com.databricks.spark.csv", SaveMode.Overwrite, Map("path" -> "/home/ramp/tmp/dim")) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.text.SimpleDateFormat | |
import java.util.Date | |
import org.apache.spark.{SparkContext, SparkConf} | |
import org.apache.spark.sql.{SaveMode, Row, SQLContext} | |
import com.databricks.spark.csv.CsvSchemaRDD | |
import org.apache.spark.sql.functions._ | |
object JDBCTester { | |
def main(args: Array[String]) { | |
val conf = new SparkConf().setAppName("JDBC Tester") | |
val sc = new SparkContext(conf) | |
val sqlCtx = new SQLContext(sc) | |
import sqlCtx.implicits._ | |
val new_dim = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/new_dim.csv", "header" -> "true")).as('ndim) | |
val ext_dim = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/ext_dim.csv", "header" -> "true")).as('pdim) | |
val res = new_dim.join(ext_dim, new_dim("nk") === ext_dim("nk"), "inner") | |
val now = new SimpleDateFormat().format(new Date()) | |
// scd type 2 changes | |
val scd2 = res.filter($"ndim.t2" !== $"pdim.t2").flatMap( row => { | |
val (newRowSeq, prevRowSeq) = row.toSeq.splitAt(row.length / 2) | |
val (newRow, prevRow) = ( | |
Row.fromSeq(newRowSeq), | |
Row.fromSeq(prevRowSeq.updated(6, now).updated(7, 0)) | |
) | |
Array(prevRow, newRow) | |
}) | |
// scd type 1 changes | |
val scd1 = res.filter($"ndim.t1" !== $"pdim.t1").select($"ndim.*") | |
sqlCtx.createDataFrame(scd2, scd1.schema).unionAll(scd1).saveAsCsvFile("/home/ramp/tmp/dim.csv") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.text.SimpleDateFormat | |
import java.util.Date | |
import org.apache.spark.{SparkContext, SparkConf} | |
import org.apache.spark.sql.{SaveMode, Row, SQLContext} | |
import com.databricks.spark.csv.CsvSchemaRDD | |
import org.apache.spark.sql.functions._ | |
object JDBCTester { | |
def main(args: Array[String]) { | |
val conf = new SparkConf().setAppName("JDBC Tester") | |
val sc = new SparkContext(conf) | |
val sqlCtx = new SQLContext(sc) | |
val volumes = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/volumes.data", "header" -> "false")).toDF("CREATED_AT", | |
"UPDATED_AT", | |
"DELETED_AT", | |
"DELETED", | |
"ID", | |
"EC2_ID", | |
"USER_ID", | |
"PROJECT_ID", | |
"HOST", | |
"SIZE", | |
"AVAILABILITY_ZONE", | |
"INSTANCE_UUID", | |
"MOUNTPOINT", | |
"ATTACH_TIME", | |
"STATUS", | |
"ATTACH_STATUS", | |
"SCHEDULED_AT", | |
"LAUNCHED_AT", | |
"TERMINATED_AT", | |
"DISPLAY_NAME", | |
"DISPLAY_DESCRIPTION", | |
"PROVIDER_LOCATION", | |
"PROVIDER_AUTH", | |
"SNAPSHOT_ID", | |
"VOLUME_TYPE_ID", | |
"SOURCE_SYSTEM_NAME", | |
"SOURCE_VOLID", | |
"bootable", | |
"attached_host", | |
"provider_geometry", | |
"_name_id", | |
"encryption_key_id", | |
"migration_status") | |
val volume_type = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/volume_types.data", "header" -> "false")).toDF( | |
"CREATED_AT","UPDATED_AT","DELETED_AT","DELETED","ID","NAME","SOURCE_SYSTEM_NAME", "DC") | |
sqlCtx.udf.register("description_" , (source: String, name:String) => s"$name in $source datacenter") | |
sqlCtx.udf.register("instanceType" , () => "cbs") | |
sqlCtx.udf.register("keyGenerator" , () => 0) | |
sqlCtx.udf.register("getdate" , () => new SimpleDateFormat("dd/mm/yy").format(new java.util.Date())) | |
sqlCtx.udf.register("instance" , () => "m_Dim_Instance") | |
sqlCtx.udf.register("source_system" , () => "cbs_instance") | |
sqlCtx.udf.register("current_record" , () => 1) | |
sqlCtx.udf.register("extended_table" , () => "Dim_Instance_Extended_CBS") | |
volume_type.registerTempTable("VT") | |
volumes.registerTempTable("V") | |
val stmt = | |
""" | |
|SELECT | |
| keyGenerator() as Instance_Key, | |
| V.ID as Instance_NK, | |
| instanceType() as Instance_Type, | |
| V.ID as Assigned_Instance_Number, | |
| V.PROJECT_ID as Assigned_Account_Number, | |
| V.DISPLAY_NAME as Instance_Name, | |
| description_(V.SOURCE_SYSTEM_NAME, VT.NAME) as Instance_Description, | |
| V.STATUS as Instance_Status, | |
| V.SOURCE_SYSTEM_NAME as Instance_Datacenter, | |
| getdate() as Rec_Created_Date, | |
| getdate() as Rec_Updated_Date, | |
| V.CREATED_AT as Instance_Creation_Date, | |
| V.UPDATED_AT as Instance_Update_Date, | |
| instance() as Instance_Created_by, | |
| instance() as Instance_Updated_by, | |
| V.CREATED_AT as Effective_Start_Datetime, | |
| V.DELETED_AT as Effective_End_Datetime, | |
| source_system() as Source_System_Name, | |
| current_record() as Current_Record, | |
| extended_table() as Extended_Table_Name | |
|FROM | |
| V INNER JOIN VT ON | |
| V.VOLUME_TYPE_ID = VT.ID | |
""".stripMargin | |
sqlCtx.sql(stmt).save("com.databricks.spark.csv", SaveMode.Overwrite, Map("path" -> "/home/ramp/tmp/dim")) | |
} | |
} |
Can you please share the data/input file ? /home/ramp/tmp/new_dim.csv
sir, great work
Hy can you explain me, what filter on "nk" column do?
What nk
column is?
same to 'ndim.t2
?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A lot of hardcoded numbers, need at least some comments or better self-describing variable names...