Skip to content

Instantly share code, notes, and snippets.

@andypetrella
Last active December 20, 2015 22:39
Show Gist options
  • Save andypetrella/6207142 to your computer and use it in GitHub Desktop.
Save andypetrella/6207142 to your computer and use it in GitHub Desktop.
Spark-bd blog entries
scalaVersion := "2.9.3"
resolvers += "Sonatype release" at "https://oss.sonatype.org/content/repositories/releases"
resolvers += "Akka repo" at "http://repo.akka.io/releases/"
resolvers += "Spray repo" at "http://repo.spray.cc"
libraryDependencies += "com.typesafe" % "config" % "1.0.2"
libraryDependencies += "org.spark-project" %% "spark-core" % "0.7.3"
libraryDependencies += "org.spark-project" %% "spark-streaming" % "0.7.3"
val both = twitterDStream union yahooDStream
val score = (_:Data) match {
case x:YahooData => if (x.delta._2 < 0) -1 else 1
case x:TwitterData => if (x.sentiments.map(_.score).sum < 0) -1 else 1
}
val computed[DStream[(Int, (Int, Int))]] = both
.map(x => (x.stock, List(x)))
.reduceByKeyAndWindow(_ ::: _, Seconds(60))
.mapValues(xs => (xs.map(score).sum, xs.foldLeft((0,0)) {
case ((y,t), x:YahooData) => (y+1,t)
case ((y,t), x:TwitterData) => (y,t+1)
}))
case class Result(stock:Stock, score:Int, time:Long)
class ResultActor() extends Actor {
var cache:Map[String, List[Result]] = Map.empty
def receive = {
case d@(stock:Stock, i:Int, time:Long) =>
cache = cache + (stock.id -> (Result(stock, i, time) :: cache.get(stock.id).getOrElse(Nil)))
case "results" => sender ! cache
//case x => println(("*"*100)+"ERROR : " + x)
}
}
class FeederActor extends Actor {
import java.net.URL
val yahooResponseFormat = List("e1", "s", "l1", "d1", "t1", "c6", "p2", "v")
val yahooService = "http://finance.yahoo.com/d/quotes.csv?s=%s&f=%s&e=.csv";
def financeData(stocks:Seq[String]) = String.format(yahooService, stocks.mkString(","), yahooResponseFormat.mkString)
var url:Option[URL] = None
var ref:Option[ActorRef] = None
def consume(actor:ActorRef, url:URL):Stream[YahooData] = //... see https://github.com/andypetrella/spark-bd/blob/master/src/main/scala/yahoo.scala#L76
def receive = {
case For(sparkled, stocks) =>
ref = Some(sparkled)
url = Some(new URL(financeData(stocks.map(_.id))))
case Tick =>
for {
actor <- ref
u <- url
} consume(actor, u).foreach(actor ! _)
}
}
case class Stock(
id:String,
keywords:List[String]
) {
@transient lazy val all = id :: keywords
}
object Stocks {
private[this] val seq = Seq(
Stock("GOOG", List("google", "android", "chrome")),
Stock("AAPL", List("apple", "ios", "iphone", "ipad")),
Stock("ORCL", List("oracle", "java", "mysql")),
Stock("YHOO", List("yahoo")),
Stock("CSCO", List("cisco")),
Stock("INTL", List("intel")),
Stock("AMD", Nil),
Stock("IBM", Nil),
Stock("MSFT", List("Microsoft", "Windows"))
)
private[this] val defaults = seq.map(x => (x.id, x)).toMap
def get(s:String) = defaults.get(s).getOrElse(Stock(s, Nil))
}
class twitter {
// ...
def apply(stocks:Seq[Stock])(implicit ssc:StreamingContext) = {
//create twitter stream using filter from the args list
val stream = ssc.twitterStream(None, stocks.flatMap(_.all))
}
// ...
}
computed.foreach { (rdd, time) =>
rdd.foreach {
case (stock, (score, (y,t))) =>
//FIXME ::
// Re-creating the system and the `actor` in the DStream function...
//... quick fix to avoid its serialization problem :-/
spark.SparkAkka.actorSystem.actorFor(
spark.SparkAkka.urlFor("results")
) ! (stock, score, time.milliseconds)
}
}
class YahooActorReceiver(feeder:String, stocks:Seq[Stock]) extends Actor with Receiver {
var lasts:Map[String, YahooData] = Map.empty
lazy private val remotePublisher = context.actorFor(feeder)
override def preStart = remotePublisher ! For(context.self, stocks)
def receive = {
case y:YahooData ⇒
val push = isInCache(y)
lasts = lasts + (y.stock.id -> y)
if (push) pushBlock(y)
}
}
object Yahoo {
import spark.SparkAkka._
lazy val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
def start = {
actorSystem.scheduler.schedule(0 milliseconds, 500 milliseconds, feeder, Tick)
}
}
case class TwitterData(
stock:Stock,
sentiments:Seq[Sentiment],
status:Status) extends Data
//...
//class Twitter... def apply ...
statusWithNotEmptySentiments
.flatMap { case (ss, st) =>
val ts = (st.getText + " " + st.getUser.getName)
.toLowerCase
.replaceAll("[#@\\$]", " ")
.split("\\s+")
.toSet
stocks.collect {
case s if s.all.exists(x => ts(x.toLowerCase)) => TwitterData(s, ss, st)
}
}
package spark
import spark.util.AkkaUtils
object SparkAkka {
private[this] lazy val config = be.bigdata.p2.conf.root.getConfig("deploy.akka")
lazy val name = config.getString("name")
lazy val host = config.getString("host")
lazy val port = config.getInt("port")
lazy val actorSystem = AkkaUtils.createActorSystem(name, host, port)._1
def urlFor(actorName:String) = "akka://"+name+"@"+host+":"+port+"/user/"+actorName
}
import spark.streaming.{Seconds, StreamingContext, DStream}
import spark.streaming.StreamingContext._
import spark.SparkContext._
object P2 extends App {
//init spark
implicit val ssc = new StreamingContext("local", "Project2", Seconds(5))
// add streams and adapt them here
//start spark
ssc.start()
}
stream
.map{ status =>
(sentiments.compute(status.getText), status)
}
.filter {
case (ss, st) => !ss.isEmpty
}
trait Data {
def stock: Stock
}
case class TwitterData(
stock:Stock,
sentiments:Seq[Sentiment],
status:Status) extends Data
case class YahooData(
stock:Stock,
trade:Double,
date:String,
time:String,
delta:(Double, Double),
volume:Int) extends Data
// ...
lazy val twitterDStream:DStream[Data] = twitter(stocks).asInstanceOf[DStream[Data]]
lazy val yahooDStream:DStream[Data] = yahoo(stocks).asInstanceOf[DStream[Data]]
/**
FILE: application.conf
twitter {
oauth {
consumerKey = ${?TWITTER_OAUTH_CONS_KEY}
consumerSecret = ${?TWITTER_OAUTH_CONS_SEC}
accessToken = ${?TWITTER_OAUTH_ACC_TKN}
accessTokenSecret = ${?TWITTER_OAUTH_TKN_SEC}
}
}
*/
lazy val root = ConfigFactory.load(getClass.getClassLoader);
lazy val twitterConfig = root.getConfig("twitter.oauth")
class Twitter(twitterConfig:Config) {
twitterConfig
.entrySet()
.map(x => (x.getKey(), x.getValue()))
.foreach{
case (p,v) => System.setProperty("twitter4j.oauth."+p, v.unwrapped.toString)
}
}
class Yahoo(feeder:String) {
def apply(stocks:Seq[Stock])(implicit ssc:StreamingContext) = {
ssc.actorStream[YahooData](Props(new YahooActorReceiver(feeder, stocks)), "YahooReceiver")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment