Last active
December 15, 2018 22:55
-
-
Save adilsonbna/865c077d68c01f505df5cec0059407e7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.eventhubs._ | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.functions._ | |
// Build connection string with the above information | |
val connectionString = ConnectionStringBuilder("Endpoint.....").setEventHubName("twitterhub").build | |
val customEventhubParameters = | |
EventHubsConf(connectionString) | |
.setMaxEventsPerTrigger(5) | |
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load() | |
incomingStream.printSchema | |
#################### New Cell #################### | |
import java.io._ | |
import java.net._ | |
import java.util._ | |
case class Language(documents: Array[LanguageDocuments], errors: Array[Any]) extends Serializable | |
case class LanguageDocuments(id: String, detectedLanguages: Array[DetectedLanguages]) extends Serializable | |
case class DetectedLanguages(name: String, iso6391Name: String, score: Double) extends Serializable | |
case class Sentiment(documents: Array[SentimentDocuments], errors: Array[Any]) extends Serializable | |
case class SentimentDocuments(id: String, score: Double) extends Serializable | |
case class RequestToTextApi(documents: Array[RequestToTextApiDocument]) extends Serializable | |
case class RequestToTextApiDocument(id: String, text: String, var language: String = "") extends Serializable | |
#################### New Cell #################### | |
import javax.net.ssl.HttpsURLConnection | |
import com.google.gson.Gson | |
import com.google.gson.GsonBuilder | |
import com.google.gson.JsonObject | |
import com.google.gson.JsonParser | |
import scala.util.parsing.json._ | |
object SentimentDetector extends Serializable { | |
// Cognitive Services API connection settings | |
val accessKey = "YOUR ACCESS KEY" | |
val host = "https://eastus.api.cognitive.microsoft.com" | |
val languagesPath = "/text/analytics/v2.0/languages" | |
val sentimentPath = "/text/analytics/v2.0/sentiment" | |
val languagesUrl = new URL(host+languagesPath) | |
val sentimenUrl = new URL(host+sentimentPath) | |
val g = new Gson | |
def getConnection(path: URL): HttpsURLConnection = { | |
val connection = path.openConnection().asInstanceOf[HttpsURLConnection] | |
connection.setRequestMethod("POST") | |
connection.setRequestProperty("Content-Type", "text/json") | |
connection.setRequestProperty("Ocp-Apim-Subscription-Key", accessKey) | |
connection.setDoOutput(true) | |
return connection | |
} | |
def prettify (json_text: String): String = { | |
val parser = new JsonParser() | |
val json = parser.parse(json_text).getAsJsonObject() | |
val gson = new GsonBuilder().setPrettyPrinting().create() | |
return gson.toJson(json) | |
} | |
// Handles the call to Cognitive Services API. | |
def processUsingApi(request: RequestToTextApi, path: URL): String = { | |
val requestToJson = g.toJson(request) | |
val encoded_text = requestToJson.getBytes("UTF-8") | |
val connection = getConnection(path) | |
val wr = new DataOutputStream(connection.getOutputStream()) | |
wr.write(encoded_text, 0, encoded_text.length) | |
wr.flush() | |
wr.close() | |
val response = new StringBuilder() | |
val in = new BufferedReader(new InputStreamReader(connection.getInputStream())) | |
var line = in.readLine() | |
while (line != null) { | |
response.append(line) | |
line = in.readLine() | |
} | |
in.close() | |
return response.toString() | |
} | |
// Calls the language API for specified documents. | |
def getLanguage (inputDocs: RequestToTextApi): Option[Language] = { | |
try { | |
val response = processUsingApi(inputDocs, languagesUrl) | |
// In case we need to log the json response somewhere | |
val niceResponse = prettify(response) | |
// Deserializing the JSON response from the API into Scala types | |
val language = g.fromJson(niceResponse, classOf[Language]) | |
if (language.documents(0).detectedLanguages(0).iso6391Name == "(Unknown)") | |
return None | |
return Some(language) | |
} catch { | |
case e: Exception => return None | |
} | |
} | |
// Calls the sentiment API for specified documents. Needs a language field to be set for each of them. | |
def getSentiment (inputDocs: RequestToTextApi): Option[Sentiment] = { | |
try { | |
val response = processUsingApi(inputDocs, sentimenUrl) | |
val niceResponse = prettify(response) | |
// Deserializing the JSON response from the API into Scala types | |
val sentiment = g.fromJson(niceResponse, classOf[Sentiment]) | |
return Some(sentiment) | |
} catch { | |
case e: Exception => return None | |
} | |
} | |
} | |
#################### New Cell #################### | |
// User Defined Function for processing content of messages to return their sentiment. | |
val toSentiment = | |
udf((textContent: String) => | |
{ | |
val inputObject = new RequestToTextApi(Array(new RequestToTextApiDocument(textContent, textContent))) | |
val detectedLanguage = SentimentDetector.getLanguage(inputObject) | |
detectedLanguage match { | |
case Some(language) => | |
if(language.documents.size > 0) { | |
inputObject.documents(0).language = language.documents(0).detectedLanguages(0).iso6391Name | |
val sentimentDetected = SentimentDetector.getSentiment(inputObject) | |
sentimentDetected match { | |
case Some(sentiment) => { | |
if(sentiment.documents.size > 0) { | |
sentiment.documents(0).score.toString() | |
} | |
else { | |
"Error happened when getting sentiment: " + sentiment.errors(0).toString | |
} | |
} | |
case None => "Couldn't detect sentiment" | |
} | |
} | |
else { | |
"Error happened when getting language" + language.errors(0).toString | |
} | |
case None => "Couldn't detect language" | |
} | |
} | |
) | |
#################### New Cell #################### | |
// Prepare a dataframe with Content and Sentiment columns | |
val streamingDataFrame = incomingStream.selectExpr("cast (body as string) AS Content").withColumn("Sentiment", toSentiment($"Content")) | |
// Display the streaming data with the sentiment | |
streamingDataFrame.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment