Last active
September 27, 2018 03:42
-
-
Save metadaddy/e793c2fc09109616b2af673c10cdadd6 to your computer and use it in GitHub Desktop.
Creating a StreamSets Spark Transformer in Scala - after first code expansion
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.streamsets.spark.scala | |
| import com.streamsets.pipeline.api.Field | |
| import com.streamsets.pipeline.api.Record | |
| import com.streamsets.pipeline.spark.api.SparkTransformer | |
| import com.streamsets.pipeline.spark.api.TransformResult | |
| import org.apache.spark.api.java.JavaPairRDD | |
| import org.apache.spark.api.java.JavaRDD | |
| import org.apache.spark.api.java.JavaSparkContext | |
| import java.io.Serializable | |
| import java.util | |
| object CustomTransformer { | |
| val VALUE_PATH = "/credit_card" | |
| val RESULT_PATH = "/credit_card_type" | |
| // Return true if creditCard starts with one of prefixList | |
| def ccPrefixMatches(creditCard: String, prefixList: Array[String]) : Boolean = { | |
| return !(prefixList.filter(creditCard.startsWith(_)).isEmpty) | |
| } | |
| } | |
| class CustomTransformer extends SparkTransformer with Serializable { | |
| val ccTypes = collection.immutable.ListMap( | |
| "Visa" -> Array("4"), | |
| "Mastercard" -> Array("51","52","53","54","55"), | |
| "AMEX" -> Array("34","37"), | |
| "Diners Club" -> Array("300","301","302","303","304","305","36","38"), | |
| "Discover" -> Array("6011","65"), | |
| "JCB" -> Array("2131","1800","35"), | |
| "Other" -> Array("") | |
| ) | |
| var emptyRDD: JavaRDD[(Record, String)] = _ | |
| override def init(javaSparkContextInstance: JavaSparkContext, params: util.List[String]): Unit = { | |
| // Create an empty JavaPairRDD to return as 'errors' | |
| emptyRDD = javaSparkContextInstance.emptyRDD | |
| } | |
| override def transform(recordRDD: JavaRDD[Record]): TransformResult = { | |
| val errors = JavaPairRDD.fromJavaRDD(emptyRDD) | |
| // Apply a function to the incoming records | |
| val result: JavaRDD[Record] = recordRDD.rdd.map(record => { | |
| val creditCard: String = record.get(CustomTransformer.VALUE_PATH).getValueAsString | |
| val matches = ccTypes.filter((ccType) => CustomTransformer.ccPrefixMatches(creditCard, ccType._2)) | |
| record.set(CustomTransformer.RESULT_PATH, Field.create(matches.head._1)) | |
| record | |
| }) | |
| // return result | |
| new TransformResult(result.toJavaRDD(), new JavaPairRDD[Record, String](errors)) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment