Skip to content

Instantly share code, notes, and snippets.

@BrunoGrandePhD
Created January 31, 2017 02:01
Show Gist options
  • Save BrunoGrandePhD/451c49dfed3582760b0e1e86b3f9aa6e to your computer and use it in GitHub Desktop.
Save BrunoGrandePhD/451c49dfed3582760b0e1e86b3f9aa6e to your computer and use it in GitHub Desktop.
"Sentiment Analysis for Amazon Product Reviews" by Apala Guha
import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row, SQLContext, DataFrameReader, functions
from pyspark.sql.functions import udf, create_map, collect_list
from pyspark.sql.types import ArrayType, StringType, DoubleType
import json
import re
from pyspark.ml.feature import HashingTF, IDF, Normalizer, ElementwiseProduct, Word2Vec, StopWordsRemover, Tokenizer, RegexTokenizer
from pyspark.ml.linalg import Vectors, DenseVector, VectorUDT
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.clustering import KMeans
import pandas as pd
import numpy as np
def CleanReviews(df):
tokenizer = RegexTokenizer(inputCol='reviewText', outputCol='tokenized', pattern='[^a-zA-Z]+')
tokenized = tokenizer.transform(df)
tokenized.show(10)
remover = StopWordsRemover(inputCol='tokenized', outputCol='filtered')
removed = remover.transform(tokenized)
removed.show(10)
removed = removed.drop('reviewText')
removed = removed.drop('tokenized')
removed = removed.withColumnRenamed('filtered', 'reviewText')
return removed
def CreateTFIDF(df):
# compute term frequency for each element
hashingTF = HashingTF(numFeatures=50000, inputCol='reviewText', outputCol='tf')
# compute tf idf
idf = IDF(minDocFreq=10, inputCol='tf', outputCol='idf')
normalizer = Normalizer(inputCol='idf', outputCol='featurevec_norm')
lr = LinearRegression(maxIter=10, featuresCol='featurevec_norm', labelCol='overall')
lreval = RegressionEvaluator(labelCol='overall')
pipeline = Pipeline(stages=[hashingTF, idf, normalizer, lr])
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=lreval, numFolds=5)
cvmodel = crossval.fit(df)
pred = cvmodel.transform(df)
pred.show(10)
print lreval.evaluate(pred)
return
def ApplyWord2Vec(df):
word2vec = Word2Vec(inputCol='reviewText', outputCol='featurevec', vectorSize=5)
lr = LinearRegression(maxIter=10, featuresCol='featurevec', labelCol='overall')
lreval = RegressionEvaluator(labelCol='overall')
pipeline = Pipeline(stages=[word2vec, lr])
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=lreval, numFolds=5)
cvmodel = crossval.fit(df)
pred = cvmodel.transform(df)
pred.show(10)
print lreval.evaluate(pred)
def MakeDict(model):
mymap = model.select(create_map('word', 'cluster').alias('map'))
mylist = mymap.select(collect_list(mymap.map).alias('dict')).head()['dict']
d = {}
for elem in mylist:
for key in elem:
d[key] = elem[key]
return d
def ClustVecMaker(s, d, nClust):
sv = {}
for x in s:
try:
cid = d[x]
sv[cid] = 1
#print 'success', x
except Exception, e:
#print 'failure', x
continue
return Vectors.sparse(nClust, sv)
def ApplyClustering(df):
word2vec = Word2Vec(inputCol='reviewText', vectorSize=5, minCount=100)
w2vModel = word2vec.fit(df)
wordVectorsDF = w2vModel.getVectors()
nClust = wordVectorsDF.count() / 20
kmeans = KMeans(k=nClust, featuresCol='vector')
modelK = kmeans.fit(wordVectorsDF)
clustersDF = modelK.transform(wordVectorsDF).select('word', 'prediction').withColumnRenamed('prediction', 'cluster').orderBy('cluster')
clustersDF.show(100)
d = MakeDict(clustersDF)
#print d
#return
clustvecmaker = udf(lambda s: ClustVecMaker(s, d, nClust), VectorUDT())
clustvecs = df.withColumn('featurevec', clustvecmaker(df.reviewText))
normalizer = Normalizer(inputCol='featurevec', outputCol='featurevec_norm')
lr = LinearRegression(maxIter=10, featuresCol='featurevec_norm', labelCol='overall')
lreval = RegressionEvaluator(labelCol='overall')
pipeline = Pipeline(stages=[normalizer, lr])
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).addGrid(lr.solver, ['l-bfgs']).build()
crossval = CrossValidator(estimator=pipeline, evaluator=lreval, estimatorParamMaps=paramGrid, numFolds=5)
cvmodel = crossval.fit(clustvecs)
pred = cvmodel.transform(clustvecs)
pred.show(10)
print lreval.evaluate(pred)
def main(f):
df = spark.read.format('json'). load(infile).repartition(32, 'unixReviewTime')
df = df.select(df.reviewText, df.overall)
#print df.dtypes
df = CleanReviews(df)
df.cache()
df.show(10)
#CreateTFIDF(df)
#ApplyWord2Vec(df)
ApplyClustering(df)
return
spark = SparkSession.builder.appName("Simple App").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("OFF")
infile = sys.argv[1]
main(infile)
FROM ubuntu:latest
RUN apt-get update
RUN apt-get install -y vim wget git htop tmux
RUN apt-get install -y openssh-client default-jre default-jdk
RUN apt-get install -y python2.7
RUN ln -s /usr/bin/python2.7 /usr/bin/python
RUN wget https://bootstrap.pypa.io/get-pip.py
RUN python get-pip.py
RUN pip2.7 install numpy
RUN pip2.7 install pandas
RUN mkdir /home/spark && cd /home/spark && wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.0-bin-hadoop2.6.tgz && tar xvf spark-2.0.0-bin-hadoop2.6.tgz
RUN export PATH=/home/spark/spark-2.0.0-bin-hadoop2.6/bin
CMD bash
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment