Skip to content

Instantly share code, notes, and snippets.

@mathieuancelin
Created April 24, 2014 10:16
Show Gist options
  • Save mathieuancelin/11249289 to your computer and use it in GitHub Desktop.
Save mathieuancelin/11249289 to your computer and use it in GitHub Desktop.
package controllers
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.libs.json._
import play.api.libs.ws._
import play.api.mvc._
import play.api.libs._
import play.api.libs.iteratee._
import play.Play
import play.api.libs.oauth._
import akka.actor.{ActorRef, Props, Actor}
import play.api.libs.concurrent.Akka
import akka.util.Timeout
import java.util.concurrent.TimeUnit
import java.net.URLEncoder
import play.api.libs.oauth.OAuth
import play.api.libs.oauth.ServiceInfo
import play.api.libs.oauth.RequestToken
import play.api.libs.oauth.ConsumerKey
case class WallRequest(keywords: String, request: RequestHeader)
case class WallResponse(enumerator: Enumerator[Array[Byte]])
case class ResetConnection()
class TweetsBroadcaster extends Actor {
import context.become
val (enumerator, channel) = Concurrent.broadcast[Array[Byte]]
def streamIt(keywords: String, request: RequestHeader, myself: ActorRef) {
WS.url(s"https://stream.twitter.com/1.1/statuses/filter.json?stall_warnings=true&filter_level=none&track=" + URLEncoder.encode(keywords, "UTF-8"))
.withRequestTimeout(-1)
.sign(OAuthCalculator(TweetWall.KEY, TweetWall.sessionTokenPair(request).get))
.withHeaders("Connection" -> "keep-alive")
.postAndRetrieveStream("")(_ => Iteratee.foreach[Array[Byte]](bytes => channel.push(bytes))).flatMap(_.run).onComplete {
case _ => myself ! ResetConnection()
}
}
def beforeAuth: Receive = {
case WallRequest(keywords, request) => {
val myself = self
sender ! WallResponse(enumerator)
become(afterAuth)
streamIt(keywords, request, myself)
}
case _ =>
}
def afterAuth: Receive = {
case WallRequest(keywords, request) => sender ! WallResponse(enumerator)
case ResetConnection() => become(beforeAuth)
case _ =>
}
def receive = beforeAuth
}
object TweetWall extends Controller {
val cfg = Play.application.configuration
val KEY = ConsumerKey(cfg.getString("twitter.consumerKey"), cfg.getString("twitter.consumerSecret"))
val TWITTER = OAuth(ServiceInfo(
"https://api.twitter.com/oauth/request_token",
"https://api.twitter.com/oauth/access_token",
"https://api.twitter.com/oauth/authorize", KEY),
use10a = false)
val ref = Akka.system(play.api.Play.current).actorOf(Props[TweetsBroadcaster])
implicit val timeout = Timeout(1, TimeUnit.SECONDS)
def index = Action {
implicit request =>
request.session.get("token").map {
token: String =>
Ok(views.html.TweetWall.wallDevoxxFR2014())
}.getOrElse {
Redirect(routes.TweetWall.authenticate)
}
}
def authenticate = Action {
implicit request =>
request.queryString.get("oauth_verifier").flatMap(_.headOption).map {
verifier =>
val tokenPair = sessionTokenPair(request).get
TWITTER.retrieveAccessToken(tokenPair, verifier) match {
case Right(t) => Redirect(routes.TweetWall.index).withSession("token" -> t.token, "secret" -> t.secret)
case Left(e) => throw e
}
}.getOrElse(
TWITTER.retrieveRequestToken(routes.TweetWall.authenticate.absoluteURL()) match {
case Right(t) => Redirect(TWITTER.redirectUrl(t.token)).withSession("token" -> t.token, "secret" -> t.secret)
case Left(e) => throw e
}
)
}
def watchTweets(keywords: String) = Action.async {
implicit request =>
import akka.pattern.ask
(ref ? WallRequest(keywords, request)).mapTo[WallResponse].map { response =>
Ok.feed(response.enumerator &> Enumeratee.map[Array[Byte]](Json.parse) &> EventSource()).as("text/event-stream")
}
}
def sessionTokenPair(implicit request: RequestHeader): Option[RequestToken] = {
for {
token <- request.session.get("token")
secret <- request.session.get("secret")
} yield {
RequestToken(token, secret)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment