Skip to content

Instantly share code, notes, and snippets.

@pratikrvyas
Last active March 29, 2017 09:38
Show Gist options
  • Save pratikrvyas/12224469bae24ad243a95b9abe716eac to your computer and use it in GitHub Desktop.
Save pratikrvyas/12224469bae24ad243a95b9abe716eac to your computer and use it in GitHub Desktop.
DBS test - Jacky's fighting skill
object JackyChanMoves {
def main(args: Array[String]) {
if (args.length < 2) {
System.exit(1)
}
/**
* Kafka with SCALA
* read kafka events
*/
val Array(brokers, topics) = args
// Create context with 5 second batch interval
val sparkConf = new SparkConf().setAppName("JackyChanMoves")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// Create direct kafka stream with brokers and topics
// "topic": "jackieChanCommand",
val topicsSet = topics.split(",").toSet
//"broker.server": "192.168.59.103", "broker.port": 9092,
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val jackysimulationmoves = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
jackysimulationmoves.foreachRDD(move=> move.wirte.mode(SaveMode.Append).json("jackieChanSimConfig.json"))
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
from __future__ import print_function
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
"""
Kafka with Python = Jacky Chan move simulation in Kafka Python , consume message from kafka broker and save in json file
"""
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Invalid args")
exit(-1)
sc = SparkContext(appName="JackyChanKafkaWithPython")
## 2 second
ssc = StreamingContext(sc, 2)
##brokers: "192.168.59.103", broker.port: 9092
##topic: "jackieChanCommand",
brokers, topic = sys.argv[1:]
# Define Kafka Consumer
kafkajackyMoves = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
## run for each RDD in kafka Dstream and save it to json
kafkajackyMoves.foreachRDD(lambda jackyMoves: jackyMoves.wirte.mode(SaveMode.Append).json("jackieChanSimConfig.json"))
ssc.start()
ssc.awaitTermination()
from __future__ import print_function
import sys
import json
from pyspark import SparkContext
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import lit
from pyspark.sql.functions import col
"""
Spark with Python = Jacky Chan move simulation in Spark Python , process json file and analyse moves
"""
if __name__ == "__main__":
spark = SparkSession.builder.master("local").appName("JackyChanMoves").getOrCreate()
## Create dataframe from json file with timestamp,style,action,weapon,target,strength
dfJackyAllMoves = spark.read.json("jackieChanSimConfig.json")
## Filter out 'Block' and 'JUMP'
dfJackyMoves=dfJackyAllMoves.where(col("action").isin(["PUNCH", "KICK"]))
## calculate remaining strength
def strenghRemain(strength,target):
target_power = {'HEAD': 20, 'ARMS': 50,'BODY': 30,'LEGS': 50 }
remainingStrg = strength - target_power[target]
return remainingStrg
## UDF
udfstrenghRemain=udf(strenghRemain, DoubleType())
##add column strengthRemaining using UDF
dfJackyMoveswithstrength=dfJackyMoves.withColumn("strengthRemaining", udfstrenghRemain("strength","target"))
## Ans 1 = Jacky's favorite style
dffavStyle=dfJackyMoveswithstrength.groupBy("style").agg(max("style"))
## Ans 2 = Jacky's killing blow where remaining streangth is <= 0 , assuming when strength is <= zero , simulation stop
dfKillingBlow=dfJackyMoveswithstrength.where($"strengthRemaining" <= 0).sort(asc("timestamp")).take(1)
spark.stop()
package spark.example
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonAST._
import org.json4s.DefaultFormats
/**
* Spark with SCALA
* Read Jacky chan simulation move in json , convert into SQL and analyse
*/
object JackyChanSQL {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local[*]", "JackyChanSQL")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
// read JSON file , assuming log file is available at below path.
// path can be passed through argument as well
val lines = sc.textFile("../jackieChanSimConfig.json")
// Process the messages
val jackymove = lines.map(line => {
// Parse the JSON, returns RDD[JValue]
parse(line)
})
//.filter(json => {
// Filter out 'Block' and 'JUMP'
// get(json \ "action") == "PUNCH" || "KICK"
// })
.map(json => {
// Extract fields we want
val timestamp = get(json \ "timestamp").toString
val style = get(json \ "style").toString
val action = get(json \ "action").toString
val weapon = get(json \ "weapon").toString
val target = get(json \ "target").toString
val strength = get(json \ "strength").toString
val power = getPower(json \ "target")
val strengthRemaining = strength.toDouble - power.toDouble
})
})
// Now we register the RDD as a Table
jackymove.registerTempTable("jacky_all_move")
// filter block and jump
val jacky_move=sqlContext.sql("SELECT * FROM jacky_all_move WHERE action NOT IN ('BLOCK', 'JUMP')")
// Jacky's favorite style
println("Jacky favorite style:\n=====================")
sql("""
SELECT style , count(style)
FROM jacky_move
HAVING count(style) = max(style) GROUP BY style
""".stripMargin).collect().foreach(println)
// Jacky's killing blow where remaining streangth is <= 0 , assuming when strength is zero , simulation stop
println("Jacky killing blow:\n=======================")
sql("""
SELECT *
FROM jacky_move
WHERE strengthRemaining <= 0 ORDER BY timestamp LIMIT 1
""".stripMargin).collect().foreach(println)
sc.stop()
}
// Utility method to unpack the value from JSON
def get(value: JValue) = {
value match {
case JNothing | JNull => ""
case JString(s) => s
case JDouble(num) => num
case JDecimal(num) => num
case JInt(num) => num
case JBool(value) => value
case _ => ""
}
}
// Utility method to get power val from JSON
def getPower(value: JValue) = {
value match {
case JNothing | JNull => JDouble(0)
case JString("HEAD") => JDouble(20)
case JString("ARMS") => JDouble(50)
case JString("BODY") => JDouble(30)
case JString("LEGS") => JDouble(50)
case _ => JDouble(0)
}
}
}
2017-03-20 22:56:23,562 INFO n.a.d.j.g.JsonDataGenerator [main] Overriding Simulation Config file from command line to use [ jackieChanSimConfig.json ]
2017-03-20 22:56:23,570 DEBUG n.a.d.j.g.JsonDataGenerator [main] Creating Simulation Runner using Simulation Config [ jackieChanSimConfig.json ]
2017-03-20 22:56:23,983 INFO n.a.d.j.g.JsonDataGenerator [main] Adding Kafka Producer with properties: {type=kafka, broker.server=192.168.59.103, broker.port=9092, topic=jackieChanCommand, flatten=false, sync=false}
2017-03-20 22:56:30,250 INFO n.a.d.j.g.SimulationRunner [main] Adding EventGenerator for [ jackieChan,jackieChanWorkflow.json ]
2017-03-20 22:56:30,251 INFO n.a.d.j.g.SimulationRunner [main] Starting Simulation
2017-03-20 22:56:30,859 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ integer,net.acesinc.data.json.generator.types.IntegerType ]
2017-03-20 22:56:30,860 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ counter,net.acesinc.data.json.generator.types.CounterType ]
2017-03-20 22:56:30,861 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ alphaNumeric,net.acesinc.data.json.generator.types.AlphaNumericType ]
2017-03-20 22:56:30,861 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ lastName,net.acesinc.data.json.generator.types.LastName ]
2017-03-20 22:56:30,861 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ uuid,net.acesinc.data.json.generator.types.UuidType ]
2017-03-20 22:56:30,862 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ stringMerge,net.acesinc.data.json.generator.types.StringMergeType ]
2017-03-20 22:56:30,862 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ timestamp,net.acesinc.data.json.generator.types.TimestampType ]
2017-03-20 22:56:30,863 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ boolean,net.acesinc.data.json.generator.types.BooleanType ]
2017-03-20 22:56:30,863 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ long,net.acesinc.data.json.generator.types.LongType ]
2017-03-20 22:56:30,863 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ nowTimestamp,net.acesinc.data.json.generator.types.NowTimestampType ]
2017-03-20 22:56:30,864 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ random,net.acesinc.data.json.generator.types.RandomType ]
2017-03-20 22:56:30,864 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ now,net.acesinc.data.json.generator.types.NowType ]
2017-03-20 22:56:30,864 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ double,net.acesinc.data.json.generator.types.DoubleType ]
2017-03-20 22:56:30,865 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ alpha,net.acesinc.data.json.generator.types.AlphaType ]
2017-03-20 22:56:30,865 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ firstName,net.acesinc.data.json.generator.types.FirstName ]
2017-03-20 22:56:30,866 DEBUG n.a.d.j.g.t.TypeHandlerFactory [Thread-2] Discovered TypeHandler [ date,net.acesinc.data.json.generator.types.DateType ]
2017-03-20 22:56:30,933 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T22:56:30.866Z","style":"WUSHU","action":"JUMP","weapon":"BROAD_SWORD","target":"HEAD","strength":5.1764} ]
2017-03-20 22:57:32,493 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T22:57:32.492Z","style":"WUSHU","action":"JUMP","weapon":"BROAD_SWORD","target":"ARMS","strength":8.8431} ]
2017-03-20 22:58:33,850 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T22:58:33.850Z","style":"KUNG_FU","action":"BLOCK","weapon":"ROPE","target":"LEGS","strength":4.1437} ]
2017-03-20 22:59:35,012 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T22:59:35.011Z","style":"WUSHU","action":"PUNCH","weapon":"ROPE","target":"BODY","strength":9.3653} ]
2017-03-20 23:00:36,462 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:00:36.461Z","style":"WUSHU","action":"BLOCK","weapon":"BROAD_SWORD","target":"BODY","strength":6.9251} ]
2017-03-20 23:01:38,104 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:01:38.103Z","style":"KUNG_FU","action":"KICK","weapon":"CHAIR","target":"BODY","strength":7.8202} ]
2017-03-20 23:02:39,589 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:02:39.587Z","style":"WUSHU","action":"KICK","weapon":"STAFF","target":"BODY","strength":7.3853} ]
2017-03-20 23:03:40,718 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:03:40.717Z","style":"DRUNKEN_BOXING","action":"JUMP","weapon":"ROPE","target":"HEAD","strength":1.8422} ]
2017-03-20 23:04:41,957 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:04:41.950Z","style":"KUNG_FU","action":"JUMP","weapon":"CHAIR","target":"HEAD","strength":8.993} ]
2017-03-20 23:05:43,330 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:05:43.328Z","style":"KUNG_FU","action":"PUNCH","weapon":"BROAD_SWORD","target":"HEAD","strength":2.3585} ]
2017-03-20 23:06:44,747 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:06:44.745Z","style":"DRUNKEN_BOXING","action":"JUMP","weapon":"BROAD_SWORD","target":"ARMS","strength":1.4583} ]
2017-03-20 23:07:46,038 DEBUG n.a.d.j.g.l.KafkaLogger [Thread-2] Sending event to Kafka: [ {"timestamp":"2017-03-20T23:07:46.033Z","style":"KUNG_FU","action":"PUNCH","weapon":"BROAD_SWORD","target":"LEGS","strength":3.3634} ]
2017-03-20 23:08:37,942 INFO n.a.d.j.g.JsonDataGenerator [Thread-3] Shutdown Hook Invoked. Shutting Down Loggers
2017-03-20 23:08:38,032 INFO n.a.d.j.g.SimulationRunner [Thread-3] Stopping Simulation
@pratikrvyas
Copy link
Author

DBS test - Jacky's fighting skill

@pratikrvyas
Copy link
Author

python code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment