Last active
December 20, 2015 22:39
-
-
Save andypetrella/6207142 to your computer and use it in GitHub Desktop.
Spark-bd blog entries
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scalaVersion := "2.9.3" | |
resolvers += "Sonatype release" at "https://oss.sonatype.org/content/repositories/releases" | |
resolvers += "Akka repo" at "http://repo.akka.io/releases/" | |
resolvers += "Spray repo" at "http://repo.spray.cc" | |
libraryDependencies += "com.typesafe" % "config" % "1.0.2" | |
libraryDependencies += "org.spark-project" %% "spark-core" % "0.7.3" | |
libraryDependencies += "org.spark-project" %% "spark-streaming" % "0.7.3" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val both = twitterDStream union yahooDStream |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val score = (_:Data) match { | |
case x:YahooData => if (x.delta._2 < 0) -1 else 1 | |
case x:TwitterData => if (x.sentiments.map(_.score).sum < 0) -1 else 1 | |
} | |
val computed[DStream[(Int, (Int, Int))]] = both | |
.map(x => (x.stock, List(x))) | |
.reduceByKeyAndWindow(_ ::: _, Seconds(60)) | |
.mapValues(xs => (xs.map(score).sum, xs.foldLeft((0,0)) { | |
case ((y,t), x:YahooData) => (y+1,t) | |
case ((y,t), x:TwitterData) => (y,t+1) | |
})) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class Result(stock:Stock, score:Int, time:Long) | |
class ResultActor() extends Actor { | |
var cache:Map[String, List[Result]] = Map.empty | |
def receive = { | |
case d@(stock:Stock, i:Int, time:Long) => | |
cache = cache + (stock.id -> (Result(stock, i, time) :: cache.get(stock.id).getOrElse(Nil))) | |
case "results" => sender ! cache | |
//case x => println(("*"*100)+"ERROR : " + x) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class FeederActor extends Actor { | |
import java.net.URL | |
val yahooResponseFormat = List("e1", "s", "l1", "d1", "t1", "c6", "p2", "v") | |
val yahooService = "http://finance.yahoo.com/d/quotes.csv?s=%s&f=%s&e=.csv"; | |
def financeData(stocks:Seq[String]) = String.format(yahooService, stocks.mkString(","), yahooResponseFormat.mkString) | |
var url:Option[URL] = None | |
var ref:Option[ActorRef] = None | |
def consume(actor:ActorRef, url:URL):Stream[YahooData] = //... see https://github.com/andypetrella/spark-bd/blob/master/src/main/scala/yahoo.scala#L76 | |
def receive = { | |
case For(sparkled, stocks) => | |
ref = Some(sparkled) | |
url = Some(new URL(financeData(stocks.map(_.id)))) | |
case Tick => | |
for { | |
actor <- ref | |
u <- url | |
} consume(actor, u).foreach(actor ! _) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class Stock( | |
id:String, | |
keywords:List[String] | |
) { | |
@transient lazy val all = id :: keywords | |
} | |
object Stocks { | |
private[this] val seq = Seq( | |
Stock("GOOG", List("google", "android", "chrome")), | |
Stock("AAPL", List("apple", "ios", "iphone", "ipad")), | |
Stock("ORCL", List("oracle", "java", "mysql")), | |
Stock("YHOO", List("yahoo")), | |
Stock("CSCO", List("cisco")), | |
Stock("INTL", List("intel")), | |
Stock("AMD", Nil), | |
Stock("IBM", Nil), | |
Stock("MSFT", List("Microsoft", "Windows")) | |
) | |
private[this] val defaults = seq.map(x => (x.id, x)).toMap | |
def get(s:String) = defaults.get(s).getOrElse(Stock(s, Nil)) | |
} | |
class twitter { | |
// ... | |
def apply(stocks:Seq[Stock])(implicit ssc:StreamingContext) = { | |
//create twitter stream using filter from the args list | |
val stream = ssc.twitterStream(None, stocks.flatMap(_.all)) | |
} | |
// ... | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
computed.foreach { (rdd, time) => | |
rdd.foreach { | |
case (stock, (score, (y,t))) => | |
//FIXME :: | |
// Re-creating the system and the `actor` in the DStream function... | |
//... quick fix to avoid its serialization problem :-/ | |
spark.SparkAkka.actorSystem.actorFor( | |
spark.SparkAkka.urlFor("results") | |
) ! (stock, score, time.milliseconds) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class YahooActorReceiver(feeder:String, stocks:Seq[Stock]) extends Actor with Receiver { | |
var lasts:Map[String, YahooData] = Map.empty | |
lazy private val remotePublisher = context.actorFor(feeder) | |
override def preStart = remotePublisher ! For(context.self, stocks) | |
def receive = { | |
case y:YahooData ⇒ | |
val push = isInCache(y) | |
lasts = lasts + (y.stock.id -> y) | |
if (push) pushBlock(y) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Yahoo { | |
import spark.SparkAkka._ | |
lazy val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") | |
def start = { | |
actorSystem.scheduler.schedule(0 milliseconds, 500 milliseconds, feeder, Tick) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class TwitterData( | |
stock:Stock, | |
sentiments:Seq[Sentiment], | |
status:Status) extends Data | |
//... | |
//class Twitter... def apply ... | |
statusWithNotEmptySentiments | |
.flatMap { case (ss, st) => | |
val ts = (st.getText + " " + st.getUser.getName) | |
.toLowerCase | |
.replaceAll("[#@\\$]", " ") | |
.split("\\s+") | |
.toSet | |
stocks.collect { | |
case s if s.all.exists(x => ts(x.toLowerCase)) => TwitterData(s, ss, st) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package spark | |
import spark.util.AkkaUtils | |
object SparkAkka { | |
private[this] lazy val config = be.bigdata.p2.conf.root.getConfig("deploy.akka") | |
lazy val name = config.getString("name") | |
lazy val host = config.getString("host") | |
lazy val port = config.getInt("port") | |
lazy val actorSystem = AkkaUtils.createActorSystem(name, host, port)._1 | |
def urlFor(actorName:String) = "akka://"+name+"@"+host+":"+port+"/user/"+actorName | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import spark.streaming.{Seconds, StreamingContext, DStream} | |
import spark.streaming.StreamingContext._ | |
import spark.SparkContext._ | |
object P2 extends App { | |
//init spark | |
implicit val ssc = new StreamingContext("local", "Project2", Seconds(5)) | |
// add streams and adapt them here | |
//start spark | |
ssc.start() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
stream | |
.map{ status => | |
(sentiments.compute(status.getText), status) | |
} | |
.filter { | |
case (ss, st) => !ss.isEmpty | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait Data { | |
def stock: Stock | |
} | |
case class TwitterData( | |
stock:Stock, | |
sentiments:Seq[Sentiment], | |
status:Status) extends Data | |
case class YahooData( | |
stock:Stock, | |
trade:Double, | |
date:String, | |
time:String, | |
delta:(Double, Double), | |
volume:Int) extends Data | |
// ... | |
lazy val twitterDStream:DStream[Data] = twitter(stocks).asInstanceOf[DStream[Data]] | |
lazy val yahooDStream:DStream[Data] = yahoo(stocks).asInstanceOf[DStream[Data]] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
FILE: application.conf | |
twitter { | |
oauth { | |
consumerKey = ${?TWITTER_OAUTH_CONS_KEY} | |
consumerSecret = ${?TWITTER_OAUTH_CONS_SEC} | |
accessToken = ${?TWITTER_OAUTH_ACC_TKN} | |
accessTokenSecret = ${?TWITTER_OAUTH_TKN_SEC} | |
} | |
} | |
*/ | |
lazy val root = ConfigFactory.load(getClass.getClassLoader); | |
lazy val twitterConfig = root.getConfig("twitter.oauth") | |
class Twitter(twitterConfig:Config) { | |
twitterConfig | |
.entrySet() | |
.map(x => (x.getKey(), x.getValue())) | |
.foreach{ | |
case (p,v) => System.setProperty("twitter4j.oauth."+p, v.unwrapped.toString) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Yahoo(feeder:String) { | |
def apply(stocks:Seq[Stock])(implicit ssc:StreamingContext) = { | |
ssc.actorStream[YahooData](Props(new YahooActorReceiver(feeder, stocks)), "YahooReceiver") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment