Created
June 29, 2017 19:13
-
-
Save mdespriee/8ae604036732f39f6345ee91acf777a0 to your computer and use it in GitHub Desktop.
Example of how to build LDA incrementally in Spark, with comparison to one-shot learning.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This code is related to PR https://github.com/apache/spark/pull/17461 | |
// I show how to use the setInitialModel() param of LDA to build a model incrementally, | |
// and I compare the performance (perplexity) with a model built in one-shot | |
import scala.collection.mutable | |
import org.apache.spark.ml.{Pipeline, PipelineModel} | |
import org.apache.spark.ml.clustering.{LDA, LDAModel} | |
import org.apache.spark.ml.feature._ | |
import org.apache.spark.sql.DataFrame | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions._ | |
// scalastyle:off println | |
object LDAIncrementalExample { | |
def main(args: Array[String]): Unit = { | |
val spark = SparkSession | |
.builder() | |
.master("local[*]") | |
.appName(s"${this.getClass.getSimpleName}") | |
.getOrCreate() | |
spark.sparkContext.setLogLevel("ERROR") | |
import spark.implicits._ | |
val dataset = spark.read.text("/home/mde/workspaces/spark-project/spark/docs/*md").toDF("doc") | |
.where(length($"doc") > 0) | |
println(s"Nb documents = ${dataset.count()}") | |
dataset.show() | |
// Let's prepare a test set for LDA perplexity eval throughout this example | |
val splits = dataset.randomSplit(Array(0.8, 0.2), 15L) | |
val (train, test) = (splits(0), splits(1)) | |
// Build a LDA in one-shot | |
val vocabSize = 30 | |
val k = 10 | |
val iter = 30 | |
println(s"One-Shot build, vocabSize=$vocabSize, k=$k, maxIterations=$iter") | |
// Prepare dataset : tokenize and build a vocabulary | |
val (dataprep, vocab) = buildDataPrepPipeline(train, vocabSize) | |
// Build a LDA | |
val vectorized = dataprep.transform(train) | |
val ldaModel = buildModel(vectorized, k, iter) | |
showTopics(spark, vocab, ldaModel) | |
// evaluate | |
val testVect = dataprep.transform(test) | |
val perplexity = ldaModel.logPerplexity(ldaModel.transform(testVect)) | |
println(s"Perplexity=$perplexity") | |
println("---------------------------------") | |
println("") | |
// --------------------------------------- | |
// Build a LDA incrementally | |
// - we assume the same tokenisation, and that vocabulary is stable | |
// (we reuse the one previously built) | |
// - let's say the data will come incrementally, in 10 chunks | |
val nbChunks = 10 | |
val chunks = train.randomSplit(Array.fill(nbChunks)(1D / nbChunks), 7L) | |
var ldaModelIncr: LDAModel = null | |
var idx = 0 | |
for (chunk <- chunks) { | |
idx += 1 | |
println(s"Incremental, chunk=$idx, vocabSize=$vocabSize, k=$k, maxIterations=$iter") | |
val chunkVect = dataprep.transform(chunk) | |
ldaModelIncr = buildModel(chunkVect, k, iter, ldaModelIncr) | |
showTopics(spark, vocab, ldaModelIncr) | |
val perplexity = ldaModelIncr.logPerplexity(testVect) | |
println(s"Perplexity=$perplexity") | |
println("---------------------------------") | |
} | |
spark.stop() | |
} | |
def buildDataPrepPipeline(dataset: DataFrame, vocabSize: Int): (PipelineModel, Array[String]) = { | |
val countTokens = udf { (words: Seq[String]) => words.length } | |
val stop = StopWordsRemover.loadDefaultStopWords("english") ++ | |
Array("tr", "td", "div", "class", "table", "html", "div") | |
val tokenizer = new RegexTokenizer().setInputCol("doc").setOutputCol("rawwords") | |
.setGaps(false).setPattern("[a-zA-Z]{3,}") | |
val stopremover = new StopWordsRemover().setInputCol("rawwords") | |
.setOutputCol("words").setStopWords(stop) | |
val vectorizer = new CountVectorizer().setInputCol("words").setOutputCol("features") | |
.setVocabSize(vocabSize) | |
.setMinDF(2) | |
val stages = Array( | |
tokenizer, stopremover, vectorizer) | |
val pipeline = new Pipeline().setStages(stages) | |
val model = pipeline.fit(dataset) | |
(model, model.stages(2).asInstanceOf[CountVectorizerModel].vocabulary) | |
} | |
def buildModel(dataset: DataFrame, k: Int, maxIter: Int, | |
previousModel: LDAModel = null): LDAModel = { | |
val lda = new LDA() | |
.setK(k) | |
.setFeaturesCol("features") | |
.setMaxIter(maxIter) | |
.setOptimizer("online") | |
if (previousModel != null) { | |
lda.setInitialModel(previousModel) | |
} | |
lda.fit(dataset) | |
} | |
def showTopics(spark: SparkSession, vocab: Array[String], ldaModel: LDAModel): Unit = { | |
import spark.implicits._ | |
val bc = spark.sparkContext.broadcast(vocab) | |
val topicWords = udf { (indices: mutable.WrappedArray[_]) => | |
indices.map { | |
case v: Int => bc.value(v) | |
} | |
} | |
ldaModel.describeTopics().select(topicWords($"termIndices").as("topics")).show(false) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment