Created
July 27, 2017 14:26
-
-
Save maasg/9d51a2a42fc831e385cf744b84e80479 to your computer and use it in GitHub Desktop.
Build path for different events and assign globalID
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"metadata": { | |
"id": "295a615e-43da-4ebb-9aac-776a28141f84", | |
"name": "GlobalUniqueState.snb", | |
"user_save_timestamp": "2017-07-27T16:18:09.707Z", | |
"auto_save_timestamp": "1970-01-01T01:00:00.000Z", | |
"language_info": { | |
"name": "scala", | |
"file_extension": "scala", | |
"codemirror_mode": "text/x-scala" | |
}, | |
"trusted": true, | |
"sparkNotebook": null, | |
"customLocalRepo": null, | |
"customRepos": null, | |
"customDeps": null, | |
"customImports": null, | |
"customArgs": null, | |
"customSparkConf": null, | |
"customVars": null | |
}, | |
"cells": [ | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "798981D2E2BA4F8B9EE088E3BE243145" | |
}, | |
"cell_type": "code", | |
"source": "import org.apache.spark.streaming._\nimport org.apache.spark.streaming.dstream.QueueInputDStream\n", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "import org.apache.spark.streaming._\nimport org.apache.spark.streaming.dstream.QueueInputDStream\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 1, | |
"time": "Took: 1.085s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "6D72421639A845418934F7C62011627A" | |
}, | |
"cell_type": "code", | |
"source": "val data = Seq( \"1|a,1|b,3|c\", \"2|d,2|e,2|f\", \"3|g,3|h,3|i,4|j\", \"5|k\", \"4|f,1|g\", \"6|h\")\nval inputRDDs = data.map(str => str.split(\",\")).map(arr => sparkContext.parallelize(arr))\n/** expected output\nZZ => 1a, 1b, 1c, 2d, 2e, 2f, 3g, 3h, 4f\nXX => 3c, 4j, 5k, 6h\nYY => 1g\n**/\n", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "data: Seq[String] = List(1|a,1|b,3|c, 2|d,2|e,2|f, 3|g,3|h,3|i,4|j, 5|k, 4|f,1|g, 6|h)\ninputRDDs: Seq[org.apache.spark.rdd.RDD[String]] = List(ParallelCollectionRDD[0] at parallelize at <console>:72, ParallelCollectionRDD[1] at parallelize at <console>:72, ParallelCollectionRDD[2] at parallelize at <console>:72, ParallelCollectionRDD[3] at parallelize at <console>:72, ParallelCollectionRDD[4] at parallelize at <console>:72, ParallelCollectionRDD[5] at parallelize at <console>:72)\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 2, | |
"time": "Took: 1.242s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "4FDAFE2A3E7B458086D5C235B3226697" | |
}, | |
"cell_type": "code", | |
"source": "case class Event(id: Int, payload: String)", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "defined class Event\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 3, | |
"time": "Took: 0.894s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "3DBA2B7EB1124740A16DADA6755B69B6" | |
}, | |
"cell_type": "code", | |
"source": "@transient val ssc = new StreamingContext(sparkContext, Seconds(5))", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@5db87859\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 4, | |
"time": "Took: 0.733s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "59152C091B524251A6741AECC244CCA5" | |
}, | |
"cell_type": "code", | |
"source": "val queue = scala.collection.mutable.Queue(inputRDDs:_*)", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "queue: scala.collection.mutable.Queue[org.apache.spark.rdd.RDD[String]] = Queue(ParallelCollectionRDD[0] at parallelize at <console>:72, ParallelCollectionRDD[1] at parallelize at <console>:72, ParallelCollectionRDD[2] at parallelize at <console>:72, ParallelCollectionRDD[3] at parallelize at <console>:72, ParallelCollectionRDD[4] at parallelize at <console>:72, ParallelCollectionRDD[5] at parallelize at <console>:72)\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 5, | |
"time": "Took: 0.711s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "C51FDEBB5F9043D8A0D045F2B0BE2A9E" | |
}, | |
"cell_type": "code", | |
"source": "// This is for test purposes only. Replace with actual stream source.\n@transient val queueDStream = ssc.queueStream(queue, oneAtATime = true)", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "queueDStream: org.apache.spark.streaming.dstream.InputDStream[String] = org.apache.spark.streaming.dstream.QueueInputDStream@6b7cdd97\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 6, | |
"time": "Took: 0.716s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "147F4ABC9B834C768197D79C303F26C0" | |
}, | |
"cell_type": "code", | |
"source": "// this is our similarity function. Replace with something appropriate.\n// We're using the function notation instead of a def, b/c it's cleaner for the serialization.\nval isSimilar: Int => Int => Boolean = event1 => event2 => Math.abs(event2 - event1).toInt == 1 \n\n// Global Id Generator. Should generate unique ids each time - replace accordingly\nval genGlobalId: () => String = () => \"gen-\" + scala.util.Random.nextInt(10000)", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "isSimilar: Int => (Int => Boolean) = <function1>\ngenGlobalId: () => String = <function0>\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 7, | |
"time": "Took: 0.855s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "BD368F7AC4C645568923DB4AD92372F9" | |
}, | |
"cell_type": "code", | |
"source": "// Here we have our initial Event stream\n@transient val eventStream = queueDStream.map{entry => \n val Array(id, payload) = entry.split(\"\\\\|\")\n Event(id.toInt, payload)\n}", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "eventStream: org.apache.spark.streaming.dstream.DStream[Event] = org.apache.spark.streaming.dstream.MappedDStream@56ab87b6\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 8, | |
"time": "Took: 0.868s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "3C96904AD563474CA8B4185F6B5B8A49" | |
}, | |
"cell_type": "code", | |
"source": "@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "states: org.apache.spark.rdd.RDD[(String, (Int, Long))] = EmptyRDD[7] at emptyRDD at <console>:71\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 9, | |
"time": "Took: 0.690s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "5BF0201296CF47538F052BED61EF0509" | |
}, | |
"cell_type": "code", | |
"source": "@transient var currentState: RDD[(String, (Int, Long))] = sparkContext.emptyRDD", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "currentState: org.apache.spark.rdd.RDD[(String, (Int, Long))] = EmptyRDD[8] at emptyRDD at <console>:71\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 10, | |
"time": "Took: 0.675s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "5EC3D2A2AC2D45C2831C88AA68992DA7" | |
}, | |
"cell_type": "code", | |
"source": "@transient val eventsById = eventStream.map(event => (event.id, event))\n@transient val groupedEvents = eventsById.groupByKey()", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "eventsById: org.apache.spark.streaming.dstream.DStream[(Int, Event)] = org.apache.spark.streaming.dstream.MappedDStream@5ab031f5\ngroupedEvents: org.apache.spark.streaming.dstream.DStream[(Int, Iterable[Event])] = org.apache.spark.streaming.dstream.ShuffledDStream@2367d0e2\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 11, | |
"time": "Took: 0.751s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "95C4DDBD66A6419C8E6C7EFE7334B056" | |
}, | |
"cell_type": "code", | |
"source": "@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) => \n val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}} \n val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)}\n \n val newEventIds = events.keys // let's extract the ids of the incoming (grouped) events\n val similarityJoinMap = newEventIds.cartesian(currentMappings)\n .collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}\n .collectAsMap\n //val similarityBC = sparkContext.broadcast(similarityJoinMap) \n val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId())))\n newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids\n \n val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) => \n events.map(event => (event.id,event.payload, globalKey))\n }\n val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}\n currentState = newStates \n states.unpersist(false) \n states = newStates.union(states)\n states.cache() \n newTaggedEvents\n }", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "taggedEvents: org.apache.spark.streaming.dstream.DStream[(Int, String, String)] = org.apache.spark.streaming.dstream.TransformedDStream@6a596725\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 12, | |
"time": "Took: 0.971s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "EE22776EA26647BFA2DF78B80B444A10" | |
}, | |
"cell_type": "code", | |
"source": "@transient val rawEventBox = ul(20)\nrawEventBox", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "rawEventBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres15: notebook.front.widgets.HtmlList = <HtmlList widget>\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anon66a44b169710c8728ef4ba67ac6a09c7"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 14, | |
"time": "Took: 0.863s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "3381C07E0EAE430E90362A9733923446" | |
}, | |
"cell_type": "code", | |
"source": "eventStream.foreachRDD(e => rawEventBox.append(e.collect.mkString(\", \")))", | |
"outputs": [ | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 15, | |
"time": "Took: 0.785s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "FE764B4A05EC481C8EA40A88ED6D473B" | |
}, | |
"cell_type": "code", | |
"source": "@transient val currentTrans = ul(20)\ncurrentTrans", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "currentTrans: notebook.front.widgets.HtmlList = <HtmlList widget>\nres19: notebook.front.widgets.HtmlList = <HtmlList widget>\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anonfc521a3a25c9a0a5958acd946894c1a9"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 16, | |
"time": "Took: 0.725s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "80A8C316BEBB440E899157A51627AE1F" | |
}, | |
"cell_type": "code", | |
"source": "@transient val eventBox = ul(20)\neventBox", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "eventBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres21: notebook.front.widgets.HtmlList = <HtmlList widget>\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anona38d37904c4f09c626ff390359d2c8af"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 17, | |
"time": "Took: 0.650s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "6056967780B14BFC8BE5EBC3635362E5" | |
}, | |
"cell_type": "code", | |
"source": "@transient val transitionChainBox = ul(20)\ntransitionChainBox", | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "transitionChainBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres23: notebook.front.widgets.HtmlList = <HtmlList widget>\n" | |
}, | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anon1022644358033f118f633706870a441a"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 18, | |
"time": "Took: 0.667s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "48D1F71E85F04A63B0F9026622D7E78E" | |
}, | |
"cell_type": "code", | |
"source": "taggedEvents.foreachRDD{events => \n eventBox.append(\"---\")\n eventBox.append(events.collect.map{case (id, payload, globalKey) => s\"$id|$payload: $globalKey\"}.mkString(\",\"))\n val transitions = states.groupByKey.mapValues(eventSeq => eventSeq.toList.sortBy{case (id, ts) => ts}.map{case (id, ts) => id}.mkString (\"<-\"))\n transitionChainBox.append(\"---\")\n transitions.collect.map{case (globalId, eventSeq) => s\"$globalId: $eventSeq\"}.foreach(s => transitionChainBox.append(s))\n \n currentTrans.append(currentState.collect.map(_.toString).mkString(\",\"))\n \n }", | |
"outputs": [ | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 19, | |
"time": "Took: 0.924s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "C19D7885F7B2437EB9B89D69462D35F9" | |
}, | |
"cell_type": "code", | |
"source": "ssc.start()", | |
"outputs": [ | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 20, | |
"time": "Took: 0.627s, at 2017-07-27 16:16" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": false, | |
"id": "054D3411347247539EBC9AB16488F987" | |
}, | |
"cell_type": "code", | |
"source": "ssc.stop(false)", | |
"outputs": [ | |
{ | |
"metadata": {}, | |
"data": { | |
"text/html": "" | |
}, | |
"output_type": "execute_result", | |
"execution_count": 21, | |
"time": "Took: 0.766s, at 2017-07-27 16:17" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"input_collapsed": false, | |
"collapsed": true, | |
"id": "0342E90E4BE949748B7D837F71336974" | |
}, | |
"cell_type": "code", | |
"source": "", | |
"outputs": [] | |
} | |
], | |
"nbformat": 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment