Skip to content

Instantly share code, notes, and snippets.

@mehalter
Last active April 5, 2016 14:56
Show Gist options
  • Save mehalter/a9377bf252043827171e162e0377d90f to your computer and use it in GitHub Desktop.
Save mehalter/a9377bf252043827171e162e0377d90f to your computer and use it in GitHub Desktop.
Twitter status streamer
package com.mehalter
import twitter4j._
import scalaz.concurrent.Task
import scalaz.stream.{async, Process, io}
object StatusStreamer {
def main(args: Array[String]): Unit = {
val twitterStream = new TwitterStreamFactory(Util.config).getInstance
twitterStream.addListener(Util.simpleStatusListener)
//val atlantaSW = Array(33.772141, -84.406406)
//val atlantaNE = Array(33.781928, -84.390202)
//twitterStream.filter(new FilterQuery().locations(atlantaSW, atlantaNE)) //filter by locations
twitterStream.sample //sample of tweets
//twitterStream.filter(new FilterQuery(2803971569L)) //Twitter IDs
//twitterStream.filter(new FilterQuery().track("computer")) //Track by specific terms
val receiveStatuses: Process[Task, Status] = Util.statuses.dequeue
receiveStatuses.map(status => s"@${status.getUser.getScreenName} (${status.getCreatedAt}): ${status.getText}")
.observe(io.printLines(Console.out)).run.run
twitterStream.cleanUp()
twitterStream.shutdown()
}
}
object Util {
val statuses = async.unboundedQueue[Status]
val config = new twitter4j.conf.ConfigurationBuilder()
.setOAuthConsumerKey("lxcQj5cTHanXBFpJTW42hy62i")
.setOAuthConsumerSecret("WbFyYpN1D1d3uAVeK80sbY0bxCuxJz7zL2aCeJLXAG3VLwIqQL")
.setOAuthAccessToken("2803971569-RetGcDFtpV7of6Akx0m69JocfYvuEQx0ykzuQVj")
.setOAuthAccessTokenSecret("RAn8Ijemr4ZgR3rgj4F4N4zDNN9M0ohNRGF7z5v8cXDGy")
.build()
def simpleStatusListener = new StatusListener {
override def onStallWarning(warning: StallWarning): Unit = {}
override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {}
override def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {}
override def onStatus(status: Status): Unit = statuses.enqueueOne(status).run
override def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
override def onException(ex: Exception): Unit = {}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment