Created
June 14, 2020 02:16
-
-
Save K41R1/04923a909679cb84ba6efe90c0f8df63 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package twitter.jobs; | |
import java.util.List; | |
import java.util.Properties; | |
import java.util.concurrent.TimeoutException; | |
import org.apache.spark.sql.Dataset; | |
import org.apache.spark.sql.Row; | |
import org.apache.spark.sql.SparkSession; | |
import static org.apache.spark.sql.functions.from_json; | |
import static org.apache.spark.sql.functions.to_json; | |
import static org.apache.spark.sql.functions.current_timestamp; | |
import static org.apache.spark.sql.functions.unix_timestamp; | |
import static org.apache.spark.sql.functions.window; | |
import static org.apache.spark.sql.functions.lit; | |
import static org.apache.spark.sql.functions.struct; | |
import static org.apache.spark.sql.functions.col; | |
import static org.apache.spark.sql.functions.split; | |
import static org.apache.spark.sql.functions.explode; | |
import static org.apache.spark.sql.functions.lower; | |
import static org.apache.spark.sql.functions.regexp_replace; | |
import org.apache.spark.sql.streaming.OutputMode; | |
import org.apache.spark.sql.streaming.StreamingQueryException; | |
import org.apache.spark.sql.types.DataTypes; | |
import org.apache.spark.sql.types.StructField; | |
import org.apache.spark.sql.types.StructType; | |
import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; | |
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; | |
import edu.stanford.nlp.pipeline.Annotation; | |
import edu.stanford.nlp.pipeline.StanfordCoreNLP; | |
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; | |
import edu.stanford.nlp.trees.Tree; | |
import edu.stanford.nlp.util.CoreMap; | |
import static twitter.sentiment.LanguageCheck.correctSpell; | |
public class CoronaSentimentTweets { | |
static Properties props = new Properties(); | |
static { | |
props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment"); | |
} | |
static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); | |
public static void main(String[] args) throws StreamingQueryException, TimeoutException { | |
SparkSession spark = getSparkSession(); | |
spark.sparkContext().setLogLevel("ERROR"); | |
spark.sqlContext().udf().register("sentiment", (String s) -> getSentiment(s), DataTypes.DoubleType); | |
spark.sqlContext().udf().register("sentimentToText", (Double d) -> sentimentToText(d), DataTypes.StringType); | |
Dataset<Row> stream = spark | |
.readStream() | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "localhost:9092") | |
.option("subscribe", "tweets") | |
.option("startingOffsets", "earliest") | |
.load(); | |
Dataset<Row> jsonTweetDF = stream.selectExpr("CAST(value AS STRING)"); | |
StructType schema = getTweetSchema(); | |
Dataset<Row> tweetsDF = jsonTweetDF | |
.select(from_json(col("value"), schema).as("tweet")) | |
.select("tweet.*") | |
.select(split(col("text"), " ").as("words"), col("text"), col("user.name")) | |
.withColumn("tokens", explode(col("words"))) | |
.filter(col("tokens").startsWith("#")) | |
.withColumn("lower_tokens", lower(col("tokens"))) | |
.withColumn("hashtag", regexp_replace(col("lower_tokens"), "[^a-zA-Z]", "")) | |
.filter(col("hashtag").contains("covid").or(col("hashtag").contains("corona"))) | |
.selectExpr("sentiment(text) as seVal", "sentimentToText(sentiment(text)) as sentiment") | |
.withColumn("EventTime", lit(current_timestamp())) | |
.withColumn("timestamp", unix_timestamp(col("EventTime"), "MM/dd/yyyy hh:mm:ss aa").cast(DataTypes.TimestampType)) | |
.withWatermark("timestamp", "1 minutes") | |
.groupBy(col("sentiment"), window(col("timestamp"), "1 minutes")) | |
.count(); | |
tweetsDF | |
.withColumn("value", to_json(struct(col("sentiment"), col("count")))) | |
.selectExpr("CAST(value as STRING)") | |
.writeStream() | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "localhost:9092") | |
.option("topic", "corona_sentiment") | |
.option("checkpointLocation", "/tmp/checkpoints") | |
.outputMode(OutputMode.Append()) | |
.start() | |
.awaitTermination(); | |
spark.stop(); | |
} | |
private static SparkSession getSparkSession() { | |
return SparkSession.builder() | |
.appName("Corona Tweets Sentiment") | |
.config("spark.executor.memory", "6g") | |
.config("spark.cores.max", "3") | |
.master("spark://127.0.0.1:7077") | |
.getOrCreate(); | |
} | |
public static String sentimentToText(Double sentiment) { | |
String s; | |
if(sentiment < -1) { | |
s = "very negative"; | |
}else if (sentiment >= -1 & sentiment < 0) { | |
s = "negative"; | |
}else if(sentiment >= 0 && sentiment < 1) { | |
s = "neutral"; | |
}else if (sentiment >= 1 && sentiment < 1) { | |
s = "positive"; | |
}else { | |
s = "very positive"; | |
} | |
return s; | |
} | |
public static Double getSentiment(String text) { | |
String checkedText = correctSpell(text); | |
Annotation document = new Annotation(checkedText); | |
pipeline.annotate(document); | |
List<CoreMap> sentences = document.get(SentencesAnnotation.class); | |
Double sum = 0.0; | |
for (CoreMap sentence : sentences) { | |
Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); | |
int sentiment = RNNCoreAnnotations.getPredictedClass(tree); | |
int scaled = sentiment - 2; | |
sum = sum + scaled; | |
} | |
Double total = sum / sentences.size(); | |
return total; | |
} | |
private static StructType getTweetSchema() { | |
StructType userSchema = DataTypes.createStructType(new StructField[] { | |
DataTypes.createStructField("name", DataTypes.StringType, true), | |
DataTypes.createStructField("location", DataTypes.StringType, true), | |
DataTypes.createStructField("friendsCount", DataTypes.IntegerType, true) | |
}); | |
StructType geoLocationType = DataTypes.createStructType(new StructField[] { | |
DataTypes.createStructField("longitude", DataTypes.DoubleType, true), | |
DataTypes.createStructField("latitude", DataTypes.DoubleType, true) | |
}); | |
StructType tweetSchema = DataTypes.createStructType(new StructField[] { | |
DataTypes.createStructField("createdAt", DataTypes.StringType, true), | |
DataTypes.createStructField("text", DataTypes.StringType, true), | |
DataTypes.createStructField("hashtags",DataTypes.createArrayType(DataTypes.StringType), true), | |
DataTypes.createStructField("user", userSchema, true), | |
DataTypes.createStructField("source", DataTypes.StringType, true), | |
DataTypes.createStructField("retweetCount", DataTypes.IntegerType, true), | |
DataTypes.createStructField("geo", geoLocationType, true) | |
}); | |
return tweetSchema; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package twitter.sentiment; | |
import java.util.List; | |
import org.languagetool.JLanguageTool; | |
import org.languagetool.language.AmericanEnglish; | |
import org.languagetool.rules.RuleMatch; | |
/** | |
* LanguageCheck | |
*/ | |
public class LanguageCheck { | |
static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); | |
public static String correctSpell(String text) { | |
String query = text; | |
try { | |
List<RuleMatch> matches = langTool.check(query); | |
String result = ""; | |
int lastPos = 0; | |
String tmp = ""; | |
for (RuleMatch ma : matches) { | |
try { | |
tmp = ma.getSuggestedReplacements().get(0); | |
result += query.substring(lastPos, ma.getFromPos()); | |
result += tmp; | |
lastPos = ma.getToPos(); | |
} catch (Exception e) { | |
return text; | |
} | |
} | |
if (lastPos < query.length()) { | |
result += query.substring(lastPos, query.length()); | |
} | |
return result; | |
} catch (Exception e) { | |
return text; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment