Skip to content

Instantly share code, notes, and snippets.

@saswata-dutta
Created October 30, 2020 16:05
Show Gist options
  • Save saswata-dutta/10c87f9afe5f941c66a70a4ab9d7455e to your computer and use it in GitHub Desktop.
Save saswata-dutta/10c87f9afe5f941c66a70a4ab9d7455e to your computer and use it in GitHub Desktop.
spark data cleaning
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.DataFrame
val schema = StructType(Array(
StructField("ID", LongType, false),
StructField("ASIN", StringType, false),
StructField("ASIN_STATIC_ITEM_NAME", StringType, false),
StructField("ASIN_STATIC_PRODUCT_DESCRIPTION", StringType, false),
StructField("ASIN_STATIC_BULLET_POINT", StringType, false),
StructField("ASIN_STATIC_BRAND", StringType, false),
StructField("ASIN_STATIC_MANUFACTURER", StringType, false),
StructField("browse_node_id", DoubleType, false)
))
val stopWords = Set("i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you", "you're", "you've", "you'll", "you'd", "your", "yours", "yourself", "yourselves", "he", "him", "his", "himself", "she", "she's", "her", "hers", "herself", "it", "it's", "its", "itself", "they", "them", "their", "theirs", "themselves", "what", "which", "who", "whom", "this", "that", "that'll", "these", "those", "am", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "having", "do", "does", "did", "doing", "a", "an", "the", "and", "but", "if", "or", "because", "as", "until", "while", "of", "at", "by", "for", "with", "about", "against", "between", "into", "through", "during", "before", "after", "above", "below", "to", "from", "up", "down", "in", "out", "on", "off", "over", "under", "again", "further", "then", "once", "here", "there", "when", "where", "why", "how", "all", "any", "both", "each", "few", "more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", "than", "too", "very", "s", "t", "can", "will", "just", "don", "don't", "should", "should've", "now", "d", "ll", "m", "o", "re", "ve", "y", "ain", "aren", "aren't", "couldn", "couldn't", "didn", "didn't", "doesn", "doesn't", "hadn", "hadn't", "hasn", "hasn't", "haven", "haven't", "isn", "isn't", "ma", "mightn", "mightn't", "mustn", "mustn't", "needn", "needn't", "shan", "shan't", "shouldn", "shouldn't", "wasn", "wasn't", "weren", "weren't", "won", "won't", "wouldn", "wouldn't")
val strSanitiserFn: String => String = (str: String) => Option(str).fold("")(it => {
val removedHtml = it.replaceAll("<[^>]*?>", "").replaceAll("&quot;|&amp;|&lt;|&gt;", "")
val words = removedHtml.toLowerCase.split("[^a-z0-9]")
words.filter(_.nonEmpty).mkString(" ")
})
val strSanitiser: UserDefinedFunction = udf[String, String](strSanitiserFn)
val conceptWords: UserDefinedFunction = udf[String, String]((str: String) => {
str.split(" ").filterNot(stopWords).mkString(" ")
})
val wordCount: UserDefinedFunction = udf[Int, String]((str: String) => str.split(" ").size)
val uniqWordCount: UserDefinedFunction = udf[Int, String]((str: String) => str.split(" ").toSet.size)
val stopWordCount: UserDefinedFunction = udf[Int, String]((str: String) => str.split(" ").filter(stopWords).size)
val meanWordLen: UserDefinedFunction = udf[Double, String]((str: String) => {
str.split(" ").map(_.size).
foldLeft((0.0, 1)) { case ((avg, idx), next) => (avg + (next - avg)/idx, idx + 1) }._1
})
val charCount: UserDefinedFunction = udf[Int, String]((str: String) => {
str.split(" ").map(_.size).foldLeft(0)((acc, el) => acc + el)
})
val urlCount: UserDefinedFunction = udf[Int, String]((str: String) => {
val urlPat = "https?://".r
urlPat.findAllIn(str).size
})
def processColumn(colName: String)(df: DataFrame): DataFrame = {
val cleanColName = s"${colName}_CLEAN"
df.
withColumn(cleanColName, strSanitiser(col(colName))).
withColumn(s"${colName}_CONCEPT_WORDS", conceptWords(col(cleanColName))).
withColumn(s"${colName}_WORD_COUNT", wordCount(col(cleanColName))).
withColumn(s"${colName}_UNIQ_WORD_COUNT", uniqWordCount(col(cleanColName))).
withColumn(s"${colName}_STOP_WORD_COUNT", stopWordCount(col(cleanColName))).
withColumn(s"${colName}_URL_COUNT", urlCount(col(cleanColName))).
withColumn(s"${colName}_MEAN_WORD_LEN", meanWordLen(col(cleanColName))).
withColumn(s"${colName}CHAR_COUNT", charCount(col(cleanColName))).
drop(colName)
}
val df = spark.read.schema(schema).
option("header", true).
option("mode", "DROPMALFORMED").
csv("train_sample_unescaped_8.csv")
val cleanDf = df.
withColumn("browse_node_id_long", col("browse_node_id").cast(LongType)).
filter(col("browse_node_id_long").isNotNull).
transform(processColumn("ASIN_STATIC_ITEM_NAME")).
transform(processColumn("ASIN_STATIC_BRAND")).
transform(processColumn("ASIN_STATIC_MANUFACTURER")).
transform(processColumn("ASIN_STATIC_PRODUCT_DESCRIPTION")).
transform(processColumn("ASIN_STATIC_BULLET_POINT"))
cleanDf.coalesce(1).write.parquet("cleaned")
cleanDf.write.parquet("cleaned_pq")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment