Skip to content

Instantly share code, notes, and snippets.

@pezon
Created May 15, 2019 16:37
Show Gist options
  • Save pezon/64f683509ed2a57958b746147056bc77 to your computer and use it in GitHub Desktop.
Save pezon/64f683509ed2a57958b746147056bc77 to your computer and use it in GitHub Desktop.
FPE tokenization Spark transformer
package com.example.tokenizers.alphabet
class LowerCaseAlphabet extends TokenizerAlphabet {
val CHARACTERS = Array[Char](
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z')
}
package com.example.tokenizers.alphabet
class NumberAlphabet extends TokenizerAlphabet {
val CHARACTERS = Array[Char](
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9')
}
package com.example.tokenizers
import com.idealista.fpe.FormatPreservingEncryption
import com.idealista.fpe.builder.FormatPreservingEncryptionBuilder
import com.idealista.fpe.config.{GenericTransformations, GenericDomain}
import com.idealista.fpe.transformer.{IntToTextTransformer, TextToIntTransformer}
import com.example.tokenizers.alphabet._
class Tokenizer(key: Array[Byte], alphabet: TokenizerAlphabet) {
val textToIntTransformer: TextToIntTransformer = new GenericTransformations(alphabet.availableCharacters())
val intToTextTransformer: IntToTextTransformer = new GenericTransformations(alphabet.availableCharacters())
val domain = new GenericDomain(alphabet, textToIntTransformer, intToTextTransformer)
val fpe: FormatPreservingEncryption =
FormatPreservingEncryptionBuilder
.ff1Implementation()
.withDomain(domain)
.withDefaultPseudoRandomFunction(key)
.withDefaultLengthRange()
.build()
def getModifiedTweak(plainText: String, tweak: Array[Byte]): Array[Byte] = {
// use non alphabet characters in tweak (e.g., HEllo123 -> (plain=llo, tweak=HE___123)
tweak ++ alphabet.getIndicesOfAvailableCharacters(plainText)
.foldLeft(plainText) { (s, i) => s.updated(i, ' ') }
.toCharArray
.map(_.toByte)
}
def tokenize(plainText: String, tweak: Any=None): String = {
val tweakBytes: Array[Byte] = tweak match {
case t: Array[Byte] => t
case t: String => t.toCharArray.map(_.toByte)
case _ => Array[Byte](0)
}
Option(plainText) match {
case Some(plainTextStr) =>
if (!plainTextStr.isEmpty) {
val plainTextValid = alphabet.removeInvalidCharacters(plainTextStr)
if (plainTextValid.length > 0) {
val plainTextPadded = alphabet.availableCharacters()(0) + plainTextValid
val cipherText = fpe.encrypt(
plainTextPadded,
getModifiedTweak(plainTextStr, tweakBytes))
.substring(1)
alphabet.addCharactersAtIndices(cipherText,
alphabet.getMapOfInvalidCharacters(plainTextStr))
} else {
plainTextStr
}
} else ""
case None => null
}
}
}
package com.example.tokenizers.alphabet
import com.idealista.fpe.config.Alphabet
abstract class TokenizerAlphabet extends Alphabet {
val CHARACTERS: Array[Char]
def availableCharacters: Array[Char] = CHARACTERS
def radix: Integer = CHARACTERS.length
def availableCharactersAsString : String = {
availableCharacters().mkString
}
def getMapOfInvalidCharacters(string: String): Map[Int, String] = {
s"[^${availableCharactersAsString}+]"
.r.findAllMatchIn(string)
.map(c => (c.start, c.toString))
.toMap
}
def getIndicesOfAvailableCharacters(string: String): Seq[Int] = {
s"[${availableCharactersAsString}+]"
.r.findAllMatchIn(string)
.map(c => c.start)
.toSeq
}
def removeInvalidCharacters(string: String): String = {
s"[${availableCharactersAsString}]+"
.r.findAllMatchIn(string)
.mkString
}
def addCharactersAtIndices(string: String, charactersMap: Map[Int, String]): String = {
val chars = string.toCharArray
val builder = new StringBuilder(chars.length + charactersMap.size)
var offset = 0
for (i <- 0 to chars.length + charactersMap.size - 1) {
if (charactersMap contains i) {
builder.append(charactersMap(i))
offset += 1
} else {
builder.append(chars(i - offset))
}
}
builder.toString()
}
}
package com.example.services
import java.util.Base64
import com.example.tokenizers._
import com.example.tokenizers.alphabet._
class TokenizerService(key: String) {
private var tokenizers = Seq[Tokenizer]()
configure(key)
def encodeBase64Key(key: Array[Byte]): String = {
Base64.getEncoder().encode(key).map(_.toChar).mkString
}
def decodeBase64Key(key: String): Array[Byte] = {
Base64.getDecoder().decode(key)
}
def addTokenizer(tokenizer: Tokenizer) = {
tokenizers = tokenizers :+ tokenizer
}
def configure(key: Array[Byte]) = {
addTokenizer(new Tokenizer(key, new LowerCaseAlphabet()))
addTokenizer(new Tokenizer(key, new UpperCaseAlphabet()))
addTokenizer(new Tokenizer(key, new NumberAlphabet()))
}
def configure(key: String): Unit = {
configure(decodeBase64Key(key))
}
def tokenize(plainText: String, tweak: Any=None): String = {
tokenizers.foldLeft(plainText) {
(curText, tokenizer) => tokenizer.tokenize(curText, tweak)
}
}
def tokenize(plainNumber: Int): Int = tokenize(plainNumber.toString).toInt
def tokenize(plainNumber: Long): Long = tokenize(plainNumber.toString).toLong
def tokenize(plainNumber: Float): Float = tokenize(plainNumber.toString).toFloat
def tokenize(plainNumber: Double): Double = tokenize(plainNumber.toString).toDouble
def tokenize(plainNumber: Int, tweak: Any): Int = tokenize(plainNumber.toString, tweak).toInt
def tokenize(plainNumber: Long, tweak: Any): Long = tokenize(plainNumber.toString, tweak).toLong
def tokenize(plainNumber: Float, tweak: Any): Float = tokenize(plainNumber.toString, tweak).toFloat
def tokenize(plainNumber: Double, tweak: Any): Double = tokenize(plainNumber.toString, tweak).toDouble
}
package com.example.transformer
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.example.settings.TokenizerTransformerSettings
object TreatmentTransformer {
val tokenizerService = TreatmentTransformerSettings.tokenizerService
val tokenizeStringUDF = udf((value: String) => {
tokenizerService.tokenize(value)
}, StringType)
val tokenizeLongUDF = udf((value: Long) => {
tokenizerService.tokenize(value)
}, LongType)
val tokenizeIntegerUDF = udf((value: Integer) => {
tokenizerService.tokenize(value)
}, IntegerType)
val tokenizeFloatUDF = udf((value: Float) => {
tokenizerService.tokenize(value)
}, FloatType)
val tokenizeDoubleUDF = udf((value: Double) => {
tokenizerService.tokenize(value)
}, DoubleType)
val tokenizeStringWithTweakUDF = udf((value: String, tweak: String) => {
tokenizerService.tokenize(value, tweak)
}, StringType)
val tokenizeLongWithTweakUDF = udf((value: Long, tweak: String) => {
tokenizerService.tokenize(value, tweak)
}, LongType)
val tokenizeIntegerWithTweakUDF = udf((value: Integer, tweak: String) => {
tokenizerService.tokenize(value, tweak)
}, IntegerType)
val tokenizeFloatWithTweakUDF = udf((value: Float, tweak: String) => {
tokenizerService.tokenize(value, tweak)
}, FloatType)
val tokenizeDoubleWithTweakUDF = udf((value: Double, tweak: String) => {
tokenizerService.tokenize(value, tweak)
}, DoubleType)
def withTokenizedColumns(columns: Seq[String])(df: DataFrame) = {
val stringColumns = df.schema.fields.filter(_.dataType == StringType).map(_.name)
val longColumns = df.schema.fields.filter(_.dataType == LongType).map(_.name)
val floatColumns = df.schema.fields.filter(_.dataType == FloatType).map(_.name)
val doubleColumns = df.schema.fields.filter(_.dataType == DoubleType).map(_.name)
val integerColumns = df.schema.fields.filter(_.dataType == IntegerType).map(_.name)
df.columns.foldLeft(df) {
(memoDF, columnName) => {
if (columns.contains(columnName) && stringColumns.contains(columnName)) {
memoDF.withColumn(columnName, tokenizeStringUDF(col(columnName)))
} else if (columns.contains(columnName) && longColumns.contains(columnName)) {
memoDF.withColumn(columnName, tokenizeLongUDF(col(columnName)))
} else if (columns.contains(columnName) && integerColumns.contains(columnName)) {
memoDF.withColumn(columnName, tokenizeIntegerUDF(col(columnName)))
} else if (columns.contains(columnName) && floatColumns.contains(columnName)) {
memoDF.withColumn(columnName, tokenizeFloatUDF(col(columnName)))
} else if (columns.contains(columnName) && doubleColumns.contains(columnName)) {
memoDF.withColumn(columnName, tokenizeDoubleUDF(col(columnName)))
} else {
memoDF.withColumn(columnName, col(columnName))
}
}
}
}
def withTweakedTokenizedColumns(columns: Seq[String], tweakCol: String)(df: DataFrame) = {
val stringColumns = df.schema.fields.filter(_.dataType == StringType).map(_.name)
val longColumns = df.schema.fields.filter(_.dataType == LongType).map(_.name)
val floatColumns = df.schema.fields.filter(_.dataType == FloatType).map(_.name)
val doubleColumns = df.schema.fields.filter(_.dataType == DoubleType).map(_.name)
val integerColumns = df.schema.fields.filter(_.dataType == IntegerType).map(_.name)
df.columns.foldLeft(df) {
(memoDF, columnName) => {
if (columns.contains(columnName) && stringColumns.contains(columnName)) {
memoDF.withColumn(columnName, tokenizeStringWithTweakUDF(col(columnName), col(tweakCol))
} else if (columns.contains(columnName) && longColumns.contains(columnName)) {
memoDF.withColumn(columnName, tokenizeLongWithTweakUDF(col(columnName), col(tweakCol)))
} else if (columns.contains(columnName) && integerColumns.contains(columnName)) {
memoDF.withColumn(columnName, tokenizeIntegerWithTweakUDF(col(columnName), col(tweakCol)))
} else if (columns.contains(columnName) && floatColumns.contains(columnName)) {
memoDF.withColumn(columnName, tokenizeFloatWithTweakUDF(col(columnName), col(tweakCol)))
} else if (columns.contains(columnName) && doubleColumns.contains(columnName)) {
memoDF.withColumn(columnName, tokenizeDoubleWithTweakUDF(col(columnName), col(tweakCol)))
} else {
memoDF.withColumn(columnName, col(columnName))
}
}
}
}
}
package com.example.settings
import com.example.services.TokenizerService
object TreatmentTransformerSettings {
val tokenizerKeyText = "ABCDEFGHIJKLMNOPQRSTUv=="
val tokenizerService = new TokenizerService(tokenizerKeyText)
}
package com.example.tokenizers.alphabet
class UpperCaseAlphabet extends TokenizerAlphabet {
val CHARACTERS = Array[Char](
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z')
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment