Created
December 15, 2018 14:53
-
-
Save adilsonbna/19c8d2a25c5265b5df6e2eb8c8eb9fd4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.collection.JavaConverters._ | |
import com.microsoft.azure.eventhubs._ | |
import java.util.concurrent._ | |
val namespaceName = "<EVENT HUBS NAMESPACE>" | |
val eventHubName = "<EVENT HUB NAME>" | |
val sasKeyName = "<POLICY NAME>" | |
val sasKey = "<POLICY KEY>" | |
val connStr = new ConnectionStringBuilder() | |
.setNamespaceName(namespaceName) | |
.setEventHubName(eventHubName) | |
.setSasKeyName(sasKeyName) | |
.setSasKey(sasKey) | |
val pool = Executors.newFixedThreadPool(1) | |
val eventHubClient = EventHubClient.create(connStr.toString(), pool) | |
def sendEvent(message: String) = { | |
val messageData = EventData.create(message.getBytes("UTF-8")) | |
eventHubClient.get().send(messageData) | |
System.out.println("Sent event: " + message + "\n") | |
} | |
import twitter4j._ | |
import twitter4j.TwitterFactory | |
import twitter4j.Twitter | |
import twitter4j.conf.ConfigurationBuilder | |
// Twitter configuration! | |
// Replace values below with yours | |
val twitterConsumerKey = "<CONSUMER KEY>" | |
val twitterConsumerSecret = "<CONSUMER SECRET>" | |
val twitterOauthAccessToken = "<ACCESS TOKEN>" | |
val twitterOauthTokenSecret = "<TOKEN SECRET>" | |
val cb = new ConfigurationBuilder() | |
cb.setDebugEnabled(true) | |
.setOAuthConsumerKey(twitterConsumerKey) | |
.setOAuthConsumerSecret(twitterConsumerSecret) | |
.setOAuthAccessToken(twitterOauthAccessToken) | |
.setOAuthAccessTokenSecret(twitterOauthTokenSecret) | |
val twitterFactory = new TwitterFactory(cb.build()) | |
val twitter = twitterFactory.getInstance() | |
// Getting tweets with keyword "Azure" and sending them to the Event Hub in realtime! | |
val query = new Query(" #Azure ") | |
query.setCount(100) | |
query.lang("en") | |
var finished = false | |
while (!finished) { | |
val result = twitter.search(query) | |
val statuses = result.getTweets() | |
var lowestStatusId = Long.MaxValue | |
for (status <- statuses.asScala) { | |
if(!status.isRetweet()){ | |
sendEvent(status.getText()) | |
} | |
lowestStatusId = Math.min(status.getId(), lowestStatusId) | |
Thread.sleep(2000) | |
} | |
query.setMaxId(lowestStatusId - 1) | |
} | |
// Closing connection to the Event Hub | |
eventHubClient.get().close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment