Last active
March 29, 2017 09:38
-
-
Save pratikrvyas/12224469bae24ad243a95b9abe716eac to your computer and use it in GitHub Desktop.
DBS test - Jacky's fighting skill
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object JackyChanMoves { | |
def main(args: Array[String]) { | |
if (args.length < 2) { | |
System.exit(1) | |
} | |
/** | |
* Kafka with SCALA | |
* read kafka events | |
*/ | |
val Array(brokers, topics) = args | |
// Create context with 5 second batch interval | |
val sparkConf = new SparkConf().setAppName("JackyChanMoves") | |
val ssc = new StreamingContext(sparkConf, Seconds(5)) | |
// Create direct kafka stream with brokers and topics | |
// "topic": "jackieChanCommand", | |
val topicsSet = topics.split(",").toSet | |
//"broker.server": "192.168.59.103", "broker.port": 9092, | |
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) | |
val jackysimulationmoves = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) | |
jackysimulationmoves.foreachRDD(move=> move.wirte.mode(SaveMode.Append).json("jackieChanSimConfig.json")) | |
// Start the computation | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import sys | |
import json | |
from pyspark import SparkContext | |
from pyspark.streaming import StreamingContext | |
from pyspark.streaming.kafka import KafkaUtils | |
""" | |
Kafka with Python = Jacky Chan move simulation in Kafka Python , consume message from kafka broker and save in json file | |
""" | |
if __name__ == "__main__": | |
if len(sys.argv) != 3: | |
print("Invalid args") | |
exit(-1) | |
sc = SparkContext(appName="JackyChanKafkaWithPython") | |
## 2 second | |
ssc = StreamingContext(sc, 2) | |
##brokers: "192.168.59.103", broker.port: 9092 | |
##topic: "jackieChanCommand", | |
brokers, topic = sys.argv[1:] | |
# Define Kafka Consumer | |
kafkajackyMoves = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) | |
## run for each RDD in kafka Dstream and save it to json | |
kafkajackyMoves.foreachRDD(lambda jackyMoves: jackyMoves.wirte.mode(SaveMode.Append).json("jackieChanSimConfig.json")) | |
ssc.start() | |
ssc.awaitTermination() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import sys | |
import json | |
from pyspark import SparkContext | |
from pyspark import SparkConf, SparkContext | |
from pyspark.sql import SparkSession | |
from pyspark.sql.types import * | |
from pyspark.sql.functions import * | |
from pyspark.sql.functions import lit | |
from pyspark.sql.functions import col | |
""" | |
Spark with Python = Jacky Chan move simulation in Spark Python , process json file and analyse moves | |
""" | |
if __name__ == "__main__": | |
spark = SparkSession.builder.master("local").appName("JackyChanMoves").getOrCreate() | |
## Create dataframe from json file with timestamp,style,action,weapon,target,strength | |
dfJackyAllMoves = spark.read.json("jackieChanSimConfig.json") | |
## Filter out 'Block' and 'JUMP' | |
dfJackyMoves=dfJackyAllMoves.where(col("action").isin(["PUNCH", "KICK"])) | |
## calculate remaining strength | |
def strenghRemain(strength,target): | |
target_power = {'HEAD': 20, 'ARMS': 50,'BODY': 30,'LEGS': 50 } | |
remainingStrg = strength - target_power[target] | |
return remainingStrg | |
## UDF | |
udfstrenghRemain=udf(strenghRemain, DoubleType()) | |
##add column strengthRemaining using UDF | |
dfJackyMoveswithstrength=dfJackyMoves.withColumn("strengthRemaining", udfstrenghRemain("strength","target")) | |
## Ans 1 = Jacky's favorite style | |
dffavStyle=dfJackyMoveswithstrength.groupBy("style").agg(max("style")) | |
## Ans 2 = Jacky's killing blow where remaining streangth is <= 0 , assuming when strength is <= zero , simulation stop | |
dfKillingBlow=dfJackyMoveswithstrength.where($"strengthRemaining" <= 0).sort(asc("timestamp")).take(1) | |
spark.stop() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package spark.example | |
import org.apache.spark._ | |
import org.apache.spark.SparkContext._ | |
import org.json4s.jackson.JsonMethods | |
import org.json4s.jackson.JsonMethods._ | |
import org.json4s.JsonAST._ | |
import org.json4s.DefaultFormats | |
/** | |
* Spark with SCALA | |
* Read Jacky chan simulation move in json , convert into SQL and analyse | |
*/ | |
object JackyChanSQL { | |
def main(args: Array[String]): Unit = { | |
val sc = new SparkContext("local[*]", "JackyChanSQL") | |
val sqlContext = new org.apache.spark.sql.SQLContext(sc) | |
import sqlContext._ | |
// read JSON file , assuming log file is available at below path. | |
// path can be passed through argument as well | |
val lines = sc.textFile("../jackieChanSimConfig.json") | |
// Process the messages | |
val jackymove = lines.map(line => { | |
// Parse the JSON, returns RDD[JValue] | |
parse(line) | |
}) | |
//.filter(json => { | |
// Filter out 'Block' and 'JUMP' | |
// get(json \ "action") == "PUNCH" || "KICK" | |
// }) | |
.map(json => { | |
// Extract fields we want | |
val timestamp = get(json \ "timestamp").toString | |
val style = get(json \ "style").toString | |
val action = get(json \ "action").toString | |
val weapon = get(json \ "weapon").toString | |
val target = get(json \ "target").toString | |
val strength = get(json \ "strength").toString | |
val power = getPower(json \ "target") | |
val strengthRemaining = strength.toDouble - power.toDouble | |
}) | |
}) | |
// Now we register the RDD as a Table | |
jackymove.registerTempTable("jacky_all_move") | |
// filter block and jump | |
val jacky_move=sqlContext.sql("SELECT * FROM jacky_all_move WHERE action NOT IN ('BLOCK', 'JUMP')") | |
// Jacky's favorite style | |
println("Jacky favorite style:\n=====================") | |
sql(""" | |
SELECT style , count(style) | |
FROM jacky_move | |
HAVING count(style) = max(style) GROUP BY style | |
""".stripMargin).collect().foreach(println) | |
// Jacky's killing blow where remaining streangth is <= 0 , assuming when strength is zero , simulation stop | |
println("Jacky killing blow:\n=======================") | |
sql(""" | |
SELECT * | |
FROM jacky_move | |
WHERE strengthRemaining <= 0 ORDER BY timestamp LIMIT 1 | |
""".stripMargin).collect().foreach(println) | |
sc.stop() | |
} | |
// Utility method to unpack the value from JSON | |
def get(value: JValue) = { | |
value match { | |
case JNothing | JNull => "" | |
case JString(s) => s | |
case JDouble(num) => num | |
case JDecimal(num) => num | |
case JInt(num) => num | |
case JBool(value) => value | |
case _ => "" | |
} | |
} | |
// Utility method to get power val from JSON | |
def getPower(value: JValue) = { | |
value match { | |
case JNothing | JNull => JDouble(0) | |
case JString("HEAD") => JDouble(20) | |
case JString("ARMS") => JDouble(50) | |
case JString("BODY") => JDouble(30) | |
case JString("LEGS") => JDouble(50) | |
case _ => JDouble(0) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2017-03-20 22:56:23,562 INFO n.a.d.j.g.JsonDataGenerator [main] Overriding Simulation Config file from command line to use [ jackieChanSimConfig.json ] | |
2017-03-20 22:56:23,570 DEBUG n.a.d.j.g.JsonDataGenerator [main] Creating Simulation Runner using Simulation Config [ jackieChanSimConfig.json ] | |
2017-03-20 22:56:23,983 INFO n.a.d.j.g.JsonDataGenerator [main] Adding Kafka Producer with properties: {type=kafka, broker.server=192.168.59.103, broker.port=9092, topic=jackieChanCommand, flatten=false, sync=false} | |
2017-03-20 22:56:30,250 INFO n.a.d.j.g.SimulationRunner [main] Adding EventGenerator for [ jackieChan,jackieChanWorkflow.json ] | |
2017-03-20 22:56:30,251 INFO n.a.d.j.g.SimulationRunner [main] Starting Simulation | |
2017-03-20 22:56:30,859 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ integer,net.acesinc.data.json.generator.types.IntegerType ] | |
2017-03-20 22:56:30,860 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ counter,net.acesinc.data.json.generator.types.CounterType ] | |
2017-03-20 22:56:30,861 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ alphaNumeric,net.acesinc.data.json.generator.types.AlphaNumericType ] | |
2017-03-20 22:56:30,861 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ lastName,net.acesinc.data.json.generator.types.LastName ] | |
2017-03-20 22:56:30,861 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ uuid,net.acesinc.data.json.generator.types.UuidType ] | |
2017-03-20 22:56:30,862 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ stringMerge,net.acesinc.data.json.generator.types.StringMergeType ] | |
2017-03-20 22:56:30,862 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ timestamp,net.acesinc.data.json.generator.types.TimestampType ] | |
2017-03-20 22:56:30,863 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ boolean,net.acesinc.data.json.generator.types.BooleanType ] | |
2017-03-20 22:56:30,863 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ long,net.acesinc.data.json.generator.types.LongType ] | |
2017-03-20 22:56:30,863 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ nowTimestamp,net.acesinc.data.json.generator.types.NowTimestampType ] | |
2017-03-20 22:56:30,864 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ random,net.acesinc.data.json.generator.types.RandomType ] | |
2017-03-20 22:56:30,864 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ now,net.acesinc.data.json.generator.types.NowType ] | |
2017-03-20 22:56:30,864 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ double,net.acesinc.data.json.generator.types.DoubleType ] | |
2017-03-20 22:56:30,865 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ alpha,net.acesinc.data.json.generator.types.AlphaType ] | |
2017-03-20 22:56:30,865 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ firstName,net.acesinc.data.json.generator.types.FirstName ] | |
2017-03-20 22:56:30,866 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ date,net.acesinc.data.json.generator.types.DateType ] | |
2017-03-20 22:56:30,933 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T22:56:30.866Z","style":"WUSHU","action":"JUMP","weapon":"BROAD_SWORD","target":"HEAD","strength":5.1764} ] | |
2017-03-20 22:57:32,493 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T22:57:32.492Z","style":"WUSHU","action":"JUMP","weapon":"BROAD_SWORD","target":"ARMS","strength":8.8431} ] | |
2017-03-20 22:58:33,850 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T22:58:33.850Z","style":"KUNG_FU","action":"BLOCK","weapon":"ROPE","target":"LEGS","strength":4.1437} ] | |
2017-03-20 22:59:35,012 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T22:59:35.011Z","style":"WUSHU","action":"PUNCH","weapon":"ROPE","target":"BODY","strength":9.3653} ] | |
2017-03-20 23:00:36,462 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:00:36.461Z","style":"WUSHU","action":"BLOCK","weapon":"BROAD_SWORD","target":"BODY","strength":6.9251} ] | |
2017-03-20 23:01:38,104 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:01:38.103Z","style":"KUNG_FU","action":"KICK","weapon":"CHAIR","target":"BODY","strength":7.8202} ] | |
2017-03-20 23:02:39,589 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:02:39.587Z","style":"WUSHU","action":"KICK","weapon":"STAFF","target":"BODY","strength":7.3853} ] | |
2017-03-20 23:03:40,718 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:03:40.717Z","style":"DRUNKEN_BOXING","action":"JUMP","weapon":"ROPE","target":"HEAD","strength":1.8422} ] | |
2017-03-20 23:04:41,957 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:04:41.950Z","style":"KUNG_FU","action":"JUMP","weapon":"CHAIR","target":"HEAD","strength":8.993} ] | |
2017-03-20 23:05:43,330 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:05:43.328Z","style":"KUNG_FU","action":"PUNCH","weapon":"BROAD_SWORD","target":"HEAD","strength":2.3585} ] | |
2017-03-20 23:06:44,747 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:06:44.745Z","style":"DRUNKEN_BOXING","action":"JUMP","weapon":"BROAD_SWORD","target":"ARMS","strength":1.4583} ] | |
2017-03-20 23:07:46,038 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:07:46.033Z","style":"KUNG_FU","action":"PUNCH","weapon":"BROAD_SWORD","target":"LEGS","strength":3.3634} ] | |
2017-03-20 23:08:37,942 INFO n.a.d.j.g.JsonDataGenerator [Thread-3] Shutdown Hook Invoked. Shutting Down Loggers | |
2017-03-20 23:08:38,032 INFO n.a.d.j.g.SimulationRunner [Thread-3] Stopping Simulation |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
DBS test - Jacky's fighting skill