Skip to content

Instantly share code, notes, and snippets.

@mathieuancelin
Created April 8, 2014 18:08
Show Gist options
  • Save mathieuancelin/10164955 to your computer and use it in GitHub Desktop.
Save mathieuancelin/10164955 to your computer and use it in GitHub Desktop.
package controllers
import play.api.mvc.{Action, Controller}
import play.api.libs.iteratee.{Enumeratee, Enumerator, Concurrent}
import play.api.libs.ws.WS.WSRequestHolder
import play.api.http.{ContentTypeOf, Writeable}
import play.api.libs.ws.WS
import java.net.URLEncoder
import play.api.libs.oauth.OAuthCalculator
import play.api.libs.json.{JsValue, Json}
import play.api.libs.EventSource
object TweetController extends Controller {
def watchTweets(keywords: String) = Action {
implicit request =>
import CustomWS.streamableWSRequestHolder
// See Twitter parameters doc https://dev.twitter.com/docs/streaming-apis/parameters
val tweetsOut: Enumerator[JsValue] =
WS.url(s"https://stream.twitter.com/1.1/statuses/filter.json?stall_warnings=true&filter_level=none&track=" + URLEncoder.encode(keywords, "UTF-8"))
.withRequestTimeout(-1) // Connected forever
.sign(OAuthCalculator(KEY, sessionTokenPair.get))
.withHeaders("Connection" -> "keep-alive")
.postAsEnumerator("")(Json.parse)
// Je suis plus un fan d'Event-Source, bien que cela marche pas avec IE
// Voir la référence ici http://caniuse.com/#feat=eventsource
Ok.feed(tweetsOut &> EventSource()).as("text/event-stream")
}
}
object CustomWS {
implicit final class streamableWSRequestHolder[A](holder: WSRequestHolder) {
def getAsEnumerator[C](transformer: Array[Byte] => C) = {
val (iteratee, enumerator) = Concurrent.joined[Array[Byte]]
holder.get(_ => iteratee).map(_.run)
enumerator.through( Enumeratee.map[Array[Byte]](transformer) )
}
def postAsEnumerator[T, C](body : T)(transformer: Array[Byte] => C)(implicit wrt: Writeable[T], ct: ContentTypeOf[T]): Enumerator[C] = {
val (iteratee, enumerator) = Concurrent.joined[Array[Byte]]
holder.postAndRetrieveStream[Unit, T](body)(_ => iteratee).map(_.run)
enumerator.through( Enumeratee.map[Array[Byte]](transformer) )
}
def putAsEnumerator[T, C](body : T)(transformer: Array[Byte] => C)(implicit wrt: Writeable[T], ct: ContentTypeOf[T]): Enumerator[C] = {
val (iteratee, enumerator) = Concurrent.joined[Array[Byte]]
holder.putAndRetrieveStream[Unit, T](body)(_ => iteratee).map(_.run)
enumerator.through( Enumeratee.map[Array[Byte]](transformer) )
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment