Created
May 15, 2019 16:37
-
-
Save pezon/64f683509ed2a57958b746147056bc77 to your computer and use it in GitHub Desktop.
FPE tokenization Spark transformer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.tokenizers.alphabet | |
class LowerCaseAlphabet extends TokenizerAlphabet { | |
val CHARACTERS = Array[Char]( | |
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', | |
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z') | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.tokenizers.alphabet | |
class NumberAlphabet extends TokenizerAlphabet { | |
val CHARACTERS = Array[Char]( | |
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9') | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.tokenizers | |
import com.idealista.fpe.FormatPreservingEncryption | |
import com.idealista.fpe.builder.FormatPreservingEncryptionBuilder | |
import com.idealista.fpe.config.{GenericTransformations, GenericDomain} | |
import com.idealista.fpe.transformer.{IntToTextTransformer, TextToIntTransformer} | |
import com.example.tokenizers.alphabet._ | |
class Tokenizer(key: Array[Byte], alphabet: TokenizerAlphabet) { | |
val textToIntTransformer: TextToIntTransformer = new GenericTransformations(alphabet.availableCharacters()) | |
val intToTextTransformer: IntToTextTransformer = new GenericTransformations(alphabet.availableCharacters()) | |
val domain = new GenericDomain(alphabet, textToIntTransformer, intToTextTransformer) | |
val fpe: FormatPreservingEncryption = | |
FormatPreservingEncryptionBuilder | |
.ff1Implementation() | |
.withDomain(domain) | |
.withDefaultPseudoRandomFunction(key) | |
.withDefaultLengthRange() | |
.build() | |
def getModifiedTweak(plainText: String, tweak: Array[Byte]): Array[Byte] = { | |
// use non alphabet characters in tweak (e.g., HEllo123 -> (plain=llo, tweak=HE___123) | |
tweak ++ alphabet.getIndicesOfAvailableCharacters(plainText) | |
.foldLeft(plainText) { (s, i) => s.updated(i, ' ') } | |
.toCharArray | |
.map(_.toByte) | |
} | |
def tokenize(plainText: String, tweak: Any=None): String = { | |
val tweakBytes: Array[Byte] = tweak match { | |
case t: Array[Byte] => t | |
case t: String => t.toCharArray.map(_.toByte) | |
case _ => Array[Byte](0) | |
} | |
Option(plainText) match { | |
case Some(plainTextStr) => | |
if (!plainTextStr.isEmpty) { | |
val plainTextValid = alphabet.removeInvalidCharacters(plainTextStr) | |
if (plainTextValid.length > 0) { | |
val plainTextPadded = alphabet.availableCharacters()(0) + plainTextValid | |
val cipherText = fpe.encrypt( | |
plainTextPadded, | |
getModifiedTweak(plainTextStr, tweakBytes)) | |
.substring(1) | |
alphabet.addCharactersAtIndices(cipherText, | |
alphabet.getMapOfInvalidCharacters(plainTextStr)) | |
} else { | |
plainTextStr | |
} | |
} else "" | |
case None => null | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.tokenizers.alphabet | |
import com.idealista.fpe.config.Alphabet | |
abstract class TokenizerAlphabet extends Alphabet { | |
val CHARACTERS: Array[Char] | |
def availableCharacters: Array[Char] = CHARACTERS | |
def radix: Integer = CHARACTERS.length | |
def availableCharactersAsString : String = { | |
availableCharacters().mkString | |
} | |
def getMapOfInvalidCharacters(string: String): Map[Int, String] = { | |
s"[^${availableCharactersAsString}+]" | |
.r.findAllMatchIn(string) | |
.map(c => (c.start, c.toString)) | |
.toMap | |
} | |
def getIndicesOfAvailableCharacters(string: String): Seq[Int] = { | |
s"[${availableCharactersAsString}+]" | |
.r.findAllMatchIn(string) | |
.map(c => c.start) | |
.toSeq | |
} | |
def removeInvalidCharacters(string: String): String = { | |
s"[${availableCharactersAsString}]+" | |
.r.findAllMatchIn(string) | |
.mkString | |
} | |
def addCharactersAtIndices(string: String, charactersMap: Map[Int, String]): String = { | |
val chars = string.toCharArray | |
val builder = new StringBuilder(chars.length + charactersMap.size) | |
var offset = 0 | |
for (i <- 0 to chars.length + charactersMap.size - 1) { | |
if (charactersMap contains i) { | |
builder.append(charactersMap(i)) | |
offset += 1 | |
} else { | |
builder.append(chars(i - offset)) | |
} | |
} | |
builder.toString() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.services | |
import java.util.Base64 | |
import com.example.tokenizers._ | |
import com.example.tokenizers.alphabet._ | |
class TokenizerService(key: String) { | |
private var tokenizers = Seq[Tokenizer]() | |
configure(key) | |
def encodeBase64Key(key: Array[Byte]): String = { | |
Base64.getEncoder().encode(key).map(_.toChar).mkString | |
} | |
def decodeBase64Key(key: String): Array[Byte] = { | |
Base64.getDecoder().decode(key) | |
} | |
def addTokenizer(tokenizer: Tokenizer) = { | |
tokenizers = tokenizers :+ tokenizer | |
} | |
def configure(key: Array[Byte]) = { | |
addTokenizer(new Tokenizer(key, new LowerCaseAlphabet())) | |
addTokenizer(new Tokenizer(key, new UpperCaseAlphabet())) | |
addTokenizer(new Tokenizer(key, new NumberAlphabet())) | |
} | |
def configure(key: String): Unit = { | |
configure(decodeBase64Key(key)) | |
} | |
def tokenize(plainText: String, tweak: Any=None): String = { | |
tokenizers.foldLeft(plainText) { | |
(curText, tokenizer) => tokenizer.tokenize(curText, tweak) | |
} | |
} | |
def tokenize(plainNumber: Int): Int = tokenize(plainNumber.toString).toInt | |
def tokenize(plainNumber: Long): Long = tokenize(plainNumber.toString).toLong | |
def tokenize(plainNumber: Float): Float = tokenize(plainNumber.toString).toFloat | |
def tokenize(plainNumber: Double): Double = tokenize(plainNumber.toString).toDouble | |
def tokenize(plainNumber: Int, tweak: Any): Int = tokenize(plainNumber.toString, tweak).toInt | |
def tokenize(plainNumber: Long, tweak: Any): Long = tokenize(plainNumber.toString, tweak).toLong | |
def tokenize(plainNumber: Float, tweak: Any): Float = tokenize(plainNumber.toString, tweak).toFloat | |
def tokenize(plainNumber: Double, tweak: Any): Double = tokenize(plainNumber.toString, tweak).toDouble | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.transformer | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.types._ | |
import com.example.settings.TokenizerTransformerSettings | |
object TreatmentTransformer { | |
val tokenizerService = TreatmentTransformerSettings.tokenizerService | |
val tokenizeStringUDF = udf((value: String) => { | |
tokenizerService.tokenize(value) | |
}, StringType) | |
val tokenizeLongUDF = udf((value: Long) => { | |
tokenizerService.tokenize(value) | |
}, LongType) | |
val tokenizeIntegerUDF = udf((value: Integer) => { | |
tokenizerService.tokenize(value) | |
}, IntegerType) | |
val tokenizeFloatUDF = udf((value: Float) => { | |
tokenizerService.tokenize(value) | |
}, FloatType) | |
val tokenizeDoubleUDF = udf((value: Double) => { | |
tokenizerService.tokenize(value) | |
}, DoubleType) | |
val tokenizeStringWithTweakUDF = udf((value: String, tweak: String) => { | |
tokenizerService.tokenize(value, tweak) | |
}, StringType) | |
val tokenizeLongWithTweakUDF = udf((value: Long, tweak: String) => { | |
tokenizerService.tokenize(value, tweak) | |
}, LongType) | |
val tokenizeIntegerWithTweakUDF = udf((value: Integer, tweak: String) => { | |
tokenizerService.tokenize(value, tweak) | |
}, IntegerType) | |
val tokenizeFloatWithTweakUDF = udf((value: Float, tweak: String) => { | |
tokenizerService.tokenize(value, tweak) | |
}, FloatType) | |
val tokenizeDoubleWithTweakUDF = udf((value: Double, tweak: String) => { | |
tokenizerService.tokenize(value, tweak) | |
}, DoubleType) | |
def withTokenizedColumns(columns: Seq[String])(df: DataFrame) = { | |
val stringColumns = df.schema.fields.filter(_.dataType == StringType).map(_.name) | |
val longColumns = df.schema.fields.filter(_.dataType == LongType).map(_.name) | |
val floatColumns = df.schema.fields.filter(_.dataType == FloatType).map(_.name) | |
val doubleColumns = df.schema.fields.filter(_.dataType == DoubleType).map(_.name) | |
val integerColumns = df.schema.fields.filter(_.dataType == IntegerType).map(_.name) | |
df.columns.foldLeft(df) { | |
(memoDF, columnName) => { | |
if (columns.contains(columnName) && stringColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, tokenizeStringUDF(col(columnName))) | |
} else if (columns.contains(columnName) && longColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, tokenizeLongUDF(col(columnName))) | |
} else if (columns.contains(columnName) && integerColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, tokenizeIntegerUDF(col(columnName))) | |
} else if (columns.contains(columnName) && floatColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, tokenizeFloatUDF(col(columnName))) | |
} else if (columns.contains(columnName) && doubleColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, tokenizeDoubleUDF(col(columnName))) | |
} else { | |
memoDF.withColumn(columnName, col(columnName)) | |
} | |
} | |
} | |
} | |
def withTweakedTokenizedColumns(columns: Seq[String], tweakCol: String)(df: DataFrame) = { | |
val stringColumns = df.schema.fields.filter(_.dataType == StringType).map(_.name) | |
val longColumns = df.schema.fields.filter(_.dataType == LongType).map(_.name) | |
val floatColumns = df.schema.fields.filter(_.dataType == FloatType).map(_.name) | |
val doubleColumns = df.schema.fields.filter(_.dataType == DoubleType).map(_.name) | |
val integerColumns = df.schema.fields.filter(_.dataType == IntegerType).map(_.name) | |
df.columns.foldLeft(df) { | |
(memoDF, columnName) => { | |
if (columns.contains(columnName) && stringColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, tokenizeStringWithTweakUDF(col(columnName), col(tweakCol)) | |
} else if (columns.contains(columnName) && longColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, tokenizeLongWithTweakUDF(col(columnName), col(tweakCol))) | |
} else if (columns.contains(columnName) && integerColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, tokenizeIntegerWithTweakUDF(col(columnName), col(tweakCol))) | |
} else if (columns.contains(columnName) && floatColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, tokenizeFloatWithTweakUDF(col(columnName), col(tweakCol))) | |
} else if (columns.contains(columnName) && doubleColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, tokenizeDoubleWithTweakUDF(col(columnName), col(tweakCol))) | |
} else { | |
memoDF.withColumn(columnName, col(columnName)) | |
} | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.settings | |
import com.example.services.TokenizerService | |
object TreatmentTransformerSettings { | |
val tokenizerKeyText = "ABCDEFGHIJKLMNOPQRSTUv==" | |
val tokenizerService = new TokenizerService(tokenizerKeyText) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.tokenizers.alphabet | |
class UpperCaseAlphabet extends TokenizerAlphabet { | |
val CHARACTERS = Array[Char]( | |
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', | |
'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z') | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment