Last active
December 15, 2015 00:28
-
-
Save halcat0x15a/5172645 to your computer and use it in GitHub Desktop.
Twitter User Streams with scala-machines!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.language.postfixOps | |
import java.io._ | |
import scalaj.http._ | |
import scalaz._, Scalaz._ | |
import scalaz.effect._, Effect._, IO._ | |
import scalaz.concurrent.Promise | |
import com.clarifi.machines._, Machine._ | |
import argonaut._, Argonaut._ | |
case class UserStream(reader: InputStreamReader) extends Procedure[IO, String] { | |
type K = Char => Any | |
val user = jObjectPL >>> jsonObjectPL("user") >>> jObjectPL | |
val screenName = user >>> jsonObjectPL("screen_name") >>> jStringPL | |
val text = jObjectPL >>> jsonObjectPL("text") >>> jStringPL | |
def status(json: Json) = for { | |
name <- screenName get json | |
text <- text get json | |
} yield s"$name: $text" | |
lazy val parse = (_: String).parse.fold(_ => None, status).orZero | |
def line(string: String): Plan[K, String, Unit] = | |
Plan.await[Char].flatMap(char => | |
if (char == '\n') | |
Plan.emit(string) | |
else | |
line(string + char)) | |
val machine = line("") outmap parse repeatedly | |
val driver = | |
new Driver[IO, Char => Any] { | |
val M = Monad[IO] | |
def apply(k: Char => Any) = | |
IO(IoExceptionOr(reader.read.toChar).toOption map k) | |
} | |
def withDriver[R](k: Driver[IO, K] => IO[R]) = k(driver) | |
def start = foreach(putStrLn).unsafePerformIO | |
} | |
object TwitterMachine extends SafeApp with FileSystem with OAuth { | |
def connect(token: Token) = | |
IO(http("https://userstream.twitter.com/1.1/user.json") | |
.oauth(consumer, token) | |
.process(identity)) | |
override def runc = | |
for { | |
option <- file | |
token <- option.fold(oauth >>= write)(read) | |
connection <- connect(token) | |
_ <- using(IO(connection.getInputStream))(stream => | |
using(IO(new InputStreamReader(stream)))(reader => | |
IO(Promise(UserStream(reader).start)) >> readLn)) | |
} yield () | |
} | |
trait FileSystem extends Any { | |
def name = ".twitter-machine" | |
def using[A <: Closeable, B](io: IO[A])(f: A => IO[B]) = | |
io.bracket(r => IO(r.close))(f) | |
def file = IO { | |
val file = new File(name) | |
file.exists option file | |
} | |
def read(file: File) = | |
using(IO(new FileInputStream(file)))(stream => | |
using(IO(new ObjectInputStream(stream)))(_.readObject match { | |
case token: Token => IO(token) | |
})) | |
def write(token: Token) = | |
using(IO(new FileOutputStream(name)))(stream => | |
using(IO(new ObjectOutputStream(stream)))(stream => | |
IO(stream.writeObject(token)) >| token)) | |
} | |
trait OAuth extends Any { | |
def key = "RmKQnjIlq6MnBjMnBNb3bQ" | |
def secret = "ZQ9Jx2y4SIvw0axtPZ2MaFYXJOLgmX1DArkWThM7D4E" | |
def consumer = Token(key, secret) | |
def http(url: String) = | |
Http(url) | |
.option(HttpOptions.connTimeout(1000)) | |
.option(HttpOptions.readTimeout(Int.MaxValue)) | |
def request = | |
IO(http("https://api.twitter.com/oauth/request_token") | |
.param("oauth_callback", "") | |
.oauth(consumer) | |
.asToken) | |
def authorize(token: String) = | |
putStrLn(s"https://api.twitter.com/oauth/authorize?oauth_token=$token") >> | |
putStr("Enter verifier: ") >> | |
readLn | |
def access(token: Token, verifier: String) = | |
IO(http("https://api.twitter.com/oauth/access_token") | |
.oauth(consumer, token, verifier).asToken) | |
def oauth = | |
for { | |
token <- request | |
verifier <- authorize(token.key) | |
token <- access(token, verifier) | |
} yield token | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment