Created
February 28, 2020 17:55
-
-
Save nuria/6e20d2b6faeb4012ceb5e43e2ab154db to your computer and use it in GitHub Desktop.
encoding and spark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.net.URLDecoder | |
import java.net.URLEncoder | |
import org.apache.spark.sql.functions._ | |
val urlDecoder = (u: String) => URLDecoder.decode(u.replaceAll("%(?![0-9a-fA-F]{2})", "%25").replaceAll("\\+", "%2B"), "UTF-8") | |
val urlEncoder = (u: String) => URLEncoder.encode(u, "UTF-8") | |
val countSlashes = (u: String) => u.count(_ == '/') | |
spark.sql("SET spark.sql.shuffle.partitions = 1024") | |
spark.udf.register("url_decode", urlDecoder) | |
spark.udf.register("url_encode", urlEncoder) | |
spark.udf.register("count_slashes", countSlashes) | |
val df = spark.read.parquet("/wmf/data/wmf/mediarequest/year=2019/month=9") | |
val d = df.selectExpr("replace(base_name, '%20', '_') as bn").distinct | |
val d2 = d.selectExpr("bn", "url_decode(bn) as dbn").cache() | |
val d3 = d2.selectExpr("bn", "dbn", "replace(url_encode(dbn), '%2F', '/') as ebn") | |
val replacements = Map( | |
"%21" -> "!", | |
"%22" -> "\"", | |
"%23" -> "#", | |
"%24" -> "\\$", | |
"%25" -> "%", | |
"%26" -> "&", | |
"%27" -> "'", | |
"%28" -> "(", | |
"%29" -> ")", | |
"%2B" -> "+", | |
"%3A" -> ":", | |
"%3B" -> ";", | |
"%3C" -> "<", | |
"%3D" -> "=", | |
"%3E" -> ">", | |
"%3F" -> "?", | |
"%40" -> "@", | |
"%5B" -> "[", | |
"%5C" -> "\\\\", | |
"%5D" -> "]", | |
"%5E" -> "^", | |
"%60" -> "`", | |
"%7B" -> "\\{", | |
"%7C" -> "\\|", | |
"%7D" -> "\\}", | |
"%7E" -> "~" | |
) | |
val replace = (u: String) => replacements.foldLeft(u)((s, r) => s.replaceAll(r._1, r._2)) | |
spark.udf.register("my_replace", replace) | |
val d4 = d3.selectExpr("bn", "my_replace(ebn) as fbn") | |
val findFirstDiff = (s1: String, s2: String) => s1 diff s2 | |
spark.udf.register("diff", findFirstDiff) | |
val ddiff = d4.where("fbn <> bn").cache() | |
ddiff.count | |
ddiff.selectExpr("diff(bn, fbn)", "bn", "fbn").distinct.show(10, false) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment