Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- {
- "metadata": {
- "id": "295a615e-43da-4ebb-9aac-776a28141f84",
- "name": "GlobalUniqueState.snb",
- "user_save_timestamp": "2017-07-27T16:18:09.707Z",
- "auto_save_timestamp": "1970-01-01T01:00:00.000Z",
- "language_info": {
- "name": "scala",
- "file_extension": "scala",
- "codemirror_mode": "text/x-scala"
- },
- "trusted": true,
- "sparkNotebook": null,
- "customLocalRepo": null,
- "customRepos": null,
- "customDeps": null,
- "customImports": null,
- "customArgs": null,
- "customSparkConf": null,
- "customVars": null
- },
- "cells": [
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "798981D2E2BA4F8B9EE088E3BE243145"
- },
- "cell_type": "code",
- "source": "import org.apache.spark.streaming._\nimport org.apache.spark.streaming.dstream.QueueInputDStream\n",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "import org.apache.spark.streaming._\nimport org.apache.spark.streaming.dstream.QueueInputDStream\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 1,
- "time": "Took: 1.085s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "6D72421639A845418934F7C62011627A"
- },
- "cell_type": "code",
- "source": "val data = Seq( \"1|a,1|b,3|c\", \"2|d,2|e,2|f\", \"3|g,3|h,3|i,4|j\", \"5|k\", \"4|f,1|g\", \"6|h\")\nval inputRDDs = data.map(str => str.split(\",\")).map(arr => sparkContext.parallelize(arr))\n/** expected output\nZZ => 1a, 1b, 1c, 2d, 2e, 2f, 3g, 3h, 4f\nXX => 3c, 4j, 5k, 6h\nYY => 1g\n**/\n",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "data: Seq[String] = List(1|a,1|b,3|c, 2|d,2|e,2|f, 3|g,3|h,3|i,4|j, 5|k, 4|f,1|g, 6|h)\ninputRDDs: Seq[org.apache.spark.rdd.RDD[String]] = List(ParallelCollectionRDD[0] at parallelize at <console>:72, ParallelCollectionRDD[1] at parallelize at <console>:72, ParallelCollectionRDD[2] at parallelize at <console>:72, ParallelCollectionRDD[3] at parallelize at <console>:72, ParallelCollectionRDD[4] at parallelize at <console>:72, ParallelCollectionRDD[5] at parallelize at <console>:72)\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 2,
- "time": "Took: 1.242s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "4FDAFE2A3E7B458086D5C235B3226697"
- },
- "cell_type": "code",
- "source": "case class Event(id: Int, payload: String)",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "defined class Event\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 3,
- "time": "Took: 0.894s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "3DBA2B7EB1124740A16DADA6755B69B6"
- },
- "cell_type": "code",
- "source": "@transient val ssc = new StreamingContext(sparkContext, Seconds(5))",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@5db87859\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 4,
- "time": "Took: 0.733s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "59152C091B524251A6741AECC244CCA5"
- },
- "cell_type": "code",
- "source": "val queue = scala.collection.mutable.Queue(inputRDDs:_*)",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "queue: scala.collection.mutable.Queue[org.apache.spark.rdd.RDD[String]] = Queue(ParallelCollectionRDD[0] at parallelize at <console>:72, ParallelCollectionRDD[1] at parallelize at <console>:72, ParallelCollectionRDD[2] at parallelize at <console>:72, ParallelCollectionRDD[3] at parallelize at <console>:72, ParallelCollectionRDD[4] at parallelize at <console>:72, ParallelCollectionRDD[5] at parallelize at <console>:72)\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 5,
- "time": "Took: 0.711s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "C51FDEBB5F9043D8A0D045F2B0BE2A9E"
- },
- "cell_type": "code",
- "source": "// This is for test purposes only. Replace with actual stream source.\n@transient val queueDStream = ssc.queueStream(queue, oneAtATime = true)",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "queueDStream: org.apache.spark.streaming.dstream.InputDStream[String] = org.apache.spark.streaming.dstream.QueueInputDStream@6b7cdd97\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 6,
- "time": "Took: 0.716s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "147F4ABC9B834C768197D79C303F26C0"
- },
- "cell_type": "code",
- "source": "// this is our similarity function. Replace with something appropriate.\n// We're using the function notation instead of a def, b/c it's cleaner for the serialization.\nval isSimilar: Int => Int => Boolean = event1 => event2 => Math.abs(event2 - event1).toInt == 1 \n\n// Global Id Generator. Should generate unique ids each time - replace accordingly\nval genGlobalId: () => String = () => \"gen-\" + scala.util.Random.nextInt(10000)",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "isSimilar: Int => (Int => Boolean) = <function1>\ngenGlobalId: () => String = <function0>\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 7,
- "time": "Took: 0.855s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "BD368F7AC4C645568923DB4AD92372F9"
- },
- "cell_type": "code",
- "source": "// Here we have our initial Event stream\n@transient val eventStream = queueDStream.map{entry => \n val Array(id, payload) = entry.split(\"\\\\|\")\n Event(id.toInt, payload)\n}",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "eventStream: org.apache.spark.streaming.dstream.DStream[Event] = org.apache.spark.streaming.dstream.MappedDStream@56ab87b6\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 8,
- "time": "Took: 0.868s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "3C96904AD563474CA8B4185F6B5B8A49"
- },
- "cell_type": "code",
- "source": "@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "states: org.apache.spark.rdd.RDD[(String, (Int, Long))] = EmptyRDD[7] at emptyRDD at <console>:71\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 9,
- "time": "Took: 0.690s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "5BF0201296CF47538F052BED61EF0509"
- },
- "cell_type": "code",
- "source": "@transient var currentState: RDD[(String, (Int, Long))] = sparkContext.emptyRDD",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "currentState: org.apache.spark.rdd.RDD[(String, (Int, Long))] = EmptyRDD[8] at emptyRDD at <console>:71\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 10,
- "time": "Took: 0.675s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "5EC3D2A2AC2D45C2831C88AA68992DA7"
- },
- "cell_type": "code",
- "source": "@transient val eventsById = eventStream.map(event => (event.id, event))\n@transient val groupedEvents = eventsById.groupByKey()",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "eventsById: org.apache.spark.streaming.dstream.DStream[(Int, Event)] = org.apache.spark.streaming.dstream.MappedDStream@5ab031f5\ngroupedEvents: org.apache.spark.streaming.dstream.DStream[(Int, Iterable[Event])] = org.apache.spark.streaming.dstream.ShuffledDStream@2367d0e2\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 11,
- "time": "Took: 0.751s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "95C4DDBD66A6419C8E6C7EFE7334B056"
- },
- "cell_type": "code",
- "source": "@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) => \n val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}} \n val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)}\n \n val newEventIds = events.keys // let's extract the ids of the incoming (grouped) events\n val similarityJoinMap = newEventIds.cartesian(currentMappings)\n .collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}\n .collectAsMap\n //val similarityBC = sparkContext.broadcast(similarityJoinMap) \n val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId())))\n newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids\n \n val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) => \n events.map(event => (event.id,event.payload, globalKey))\n }\n val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}\n currentState = newStates \n states.unpersist(false) \n states = newStates.union(states)\n states.cache() \n newTaggedEvents\n }",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "taggedEvents: org.apache.spark.streaming.dstream.DStream[(Int, String, String)] = org.apache.spark.streaming.dstream.TransformedDStream@6a596725\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 12,
- "time": "Took: 0.971s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "EE22776EA26647BFA2DF78B80B444A10"
- },
- "cell_type": "code",
- "source": "@transient val rawEventBox = ul(20)\nrawEventBox",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "rawEventBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres15: notebook.front.widgets.HtmlList = <HtmlList widget>\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anon66a44b169710c8728ef4ba67ac6a09c7"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
- },
- "output_type": "execute_result",
- "execution_count": 14,
- "time": "Took: 0.863s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "3381C07E0EAE430E90362A9733923446"
- },
- "cell_type": "code",
- "source": "eventStream.foreachRDD(e => rawEventBox.append(e.collect.mkString(\", \")))",
- "outputs": [
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 15,
- "time": "Took: 0.785s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "FE764B4A05EC481C8EA40A88ED6D473B"
- },
- "cell_type": "code",
- "source": "@transient val currentTrans = ul(20)\ncurrentTrans",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "currentTrans: notebook.front.widgets.HtmlList = <HtmlList widget>\nres19: notebook.front.widgets.HtmlList = <HtmlList widget>\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anonfc521a3a25c9a0a5958acd946894c1a9"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
- },
- "output_type": "execute_result",
- "execution_count": 16,
- "time": "Took: 0.725s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "80A8C316BEBB440E899157A51627AE1F"
- },
- "cell_type": "code",
- "source": "@transient val eventBox = ul(20)\neventBox",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "eventBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres21: notebook.front.widgets.HtmlList = <HtmlList widget>\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anona38d37904c4f09c626ff390359d2c8af"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
- },
- "output_type": "execute_result",
- "execution_count": 17,
- "time": "Took: 0.650s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "6056967780B14BFC8BE5EBC3635362E5"
- },
- "cell_type": "code",
- "source": "@transient val transitionChainBox = ul(20)\ntransitionChainBox",
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": "transitionChainBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres23: notebook.front.widgets.HtmlList = <HtmlList widget>\n"
- },
- {
- "metadata": {},
- "data": {
- "text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anon1022644358033f118f633706870a441a"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
- },
- "output_type": "execute_result",
- "execution_count": 18,
- "time": "Took: 0.667s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "48D1F71E85F04A63B0F9026622D7E78E"
- },
- "cell_type": "code",
- "source": "taggedEvents.foreachRDD{events => \n eventBox.append(\"---\")\n eventBox.append(events.collect.map{case (id, payload, globalKey) => s\"$id|$payload: $globalKey\"}.mkString(\",\"))\n val transitions = states.groupByKey.mapValues(eventSeq => eventSeq.toList.sortBy{case (id, ts) => ts}.map{case (id, ts) => id}.mkString (\"<-\"))\n transitionChainBox.append(\"---\")\n transitions.collect.map{case (globalId, eventSeq) => s\"$globalId: $eventSeq\"}.foreach(s => transitionChainBox.append(s))\n \n currentTrans.append(currentState.collect.map(_.toString).mkString(\",\"))\n \n }",
- "outputs": [
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 19,
- "time": "Took: 0.924s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "C19D7885F7B2437EB9B89D69462D35F9"
- },
- "cell_type": "code",
- "source": "ssc.start()",
- "outputs": [
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 20,
- "time": "Took: 0.627s, at 2017-07-27 16:16"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": false,
- "id": "054D3411347247539EBC9AB16488F987"
- },
- "cell_type": "code",
- "source": "ssc.stop(false)",
- "outputs": [
- {
- "metadata": {},
- "data": {
- "text/html": ""
- },
- "output_type": "execute_result",
- "execution_count": 21,
- "time": "Took: 0.766s, at 2017-07-27 16:17"
- }
- ]
- },
- {
- "metadata": {
- "trusted": true,
- "input_collapsed": false,
- "collapsed": true,
- "id": "0342E90E4BE949748B7D837F71336974"
- },
- "cell_type": "code",
- "source": "",
- "outputs": []
- }
- ],
- "nbformat": 4
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement