Skip to content

Instantly share code, notes, and snippets.

@metadaddy
Last active September 27, 2018 03:42
Show Gist options
  • Select an option

  • Save metadaddy/e793c2fc09109616b2af673c10cdadd6 to your computer and use it in GitHub Desktop.

Select an option

Save metadaddy/e793c2fc09109616b2af673c10cdadd6 to your computer and use it in GitHub Desktop.
Creating a StreamSets Spark Transformer in Scala - after first code expansion
package com.streamsets.spark.scala
import com.streamsets.pipeline.api.Field
import com.streamsets.pipeline.api.Record
import com.streamsets.pipeline.spark.api.SparkTransformer
import com.streamsets.pipeline.spark.api.TransformResult
import org.apache.spark.api.java.JavaPairRDD
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext
import java.io.Serializable
import java.util
object CustomTransformer {
val VALUE_PATH = "/credit_card"
val RESULT_PATH = "/credit_card_type"
// Return true if creditCard starts with one of prefixList
def ccPrefixMatches(creditCard: String, prefixList: Array[String]) : Boolean = {
return !(prefixList.filter(creditCard.startsWith(_)).isEmpty)
}
}
class CustomTransformer extends SparkTransformer with Serializable {
val ccTypes = collection.immutable.ListMap(
"Visa" -> Array("4"),
"Mastercard" -> Array("51","52","53","54","55"),
"AMEX" -> Array("34","37"),
"Diners Club" -> Array("300","301","302","303","304","305","36","38"),
"Discover" -> Array("6011","65"),
"JCB" -> Array("2131","1800","35"),
"Other" -> Array("")
)
var emptyRDD: JavaRDD[(Record, String)] = _
override def init(javaSparkContextInstance: JavaSparkContext, params: util.List[String]): Unit = {
// Create an empty JavaPairRDD to return as 'errors'
emptyRDD = javaSparkContextInstance.emptyRDD
}
override def transform(recordRDD: JavaRDD[Record]): TransformResult = {
val errors = JavaPairRDD.fromJavaRDD(emptyRDD)
// Apply a function to the incoming records
val result: JavaRDD[Record] = recordRDD.rdd.map(record => {
val creditCard: String = record.get(CustomTransformer.VALUE_PATH).getValueAsString
val matches = ccTypes.filter((ccType) => CustomTransformer.ccPrefixMatches(creditCard, ccType._2))
record.set(CustomTransformer.RESULT_PATH, Field.create(matches.head._1))
record
})
// return result
new TransformResult(result.toJavaRDD(), new JavaPairRDD[Record, String](errors))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment