Advertisement
Guest User

Untitled

a guest
Jul 28th, 2017
66
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 20.59 KB | None | 0 0
  1. {
  2. "metadata": {
  3. "id": "295a615e-43da-4ebb-9aac-776a28141f84",
  4. "name": "GlobalUniqueState.snb",
  5. "user_save_timestamp": "2017-07-27T16:18:09.707Z",
  6. "auto_save_timestamp": "1970-01-01T01:00:00.000Z",
  7. "language_info": {
  8. "name": "scala",
  9. "file_extension": "scala",
  10. "codemirror_mode": "text/x-scala"
  11. },
  12. "trusted": true,
  13. "sparkNotebook": null,
  14. "customLocalRepo": null,
  15. "customRepos": null,
  16. "customDeps": null,
  17. "customImports": null,
  18. "customArgs": null,
  19. "customSparkConf": null,
  20. "customVars": null
  21. },
  22. "cells": [
  23. {
  24. "metadata": {
  25. "trusted": true,
  26. "input_collapsed": false,
  27. "collapsed": false,
  28. "id": "798981D2E2BA4F8B9EE088E3BE243145"
  29. },
  30. "cell_type": "code",
  31. "source": "import org.apache.spark.streaming._\nimport org.apache.spark.streaming.dstream.QueueInputDStream\n",
  32. "outputs": [
  33. {
  34. "name": "stdout",
  35. "output_type": "stream",
  36. "text": "import org.apache.spark.streaming._\nimport org.apache.spark.streaming.dstream.QueueInputDStream\n"
  37. },
  38. {
  39. "metadata": {},
  40. "data": {
  41. "text/html": ""
  42. },
  43. "output_type": "execute_result",
  44. "execution_count": 1,
  45. "time": "Took: 1.085s, at 2017-07-27 16:16"
  46. }
  47. ]
  48. },
  49. {
  50. "metadata": {
  51. "trusted": true,
  52. "input_collapsed": false,
  53. "collapsed": false,
  54. "id": "6D72421639A845418934F7C62011627A"
  55. },
  56. "cell_type": "code",
  57. "source": "val data = Seq( \"1|a,1|b,3|c\", \"2|d,2|e,2|f\", \"3|g,3|h,3|i,4|j\", \"5|k\", \"4|f,1|g\", \"6|h\")\nval inputRDDs = data.map(str => str.split(\",\")).map(arr => sparkContext.parallelize(arr))\n/** expected output\nZZ => 1a, 1b, 1c, 2d, 2e, 2f, 3g, 3h, 4f\nXX => 3c, 4j, 5k, 6h\nYY => 1g\n**/\n",
  58. "outputs": [
  59. {
  60. "name": "stdout",
  61. "output_type": "stream",
  62. "text": "data: Seq[String] = List(1|a,1|b,3|c, 2|d,2|e,2|f, 3|g,3|h,3|i,4|j, 5|k, 4|f,1|g, 6|h)\ninputRDDs: Seq[org.apache.spark.rdd.RDD[String]] = List(ParallelCollectionRDD[0] at parallelize at <console>:72, ParallelCollectionRDD[1] at parallelize at <console>:72, ParallelCollectionRDD[2] at parallelize at <console>:72, ParallelCollectionRDD[3] at parallelize at <console>:72, ParallelCollectionRDD[4] at parallelize at <console>:72, ParallelCollectionRDD[5] at parallelize at <console>:72)\n"
  63. },
  64. {
  65. "metadata": {},
  66. "data": {
  67. "text/html": ""
  68. },
  69. "output_type": "execute_result",
  70. "execution_count": 2,
  71. "time": "Took: 1.242s, at 2017-07-27 16:16"
  72. }
  73. ]
  74. },
  75. {
  76. "metadata": {
  77. "trusted": true,
  78. "input_collapsed": false,
  79. "collapsed": false,
  80. "id": "4FDAFE2A3E7B458086D5C235B3226697"
  81. },
  82. "cell_type": "code",
  83. "source": "case class Event(id: Int, payload: String)",
  84. "outputs": [
  85. {
  86. "name": "stdout",
  87. "output_type": "stream",
  88. "text": "defined class Event\n"
  89. },
  90. {
  91. "metadata": {},
  92. "data": {
  93. "text/html": ""
  94. },
  95. "output_type": "execute_result",
  96. "execution_count": 3,
  97. "time": "Took: 0.894s, at 2017-07-27 16:16"
  98. }
  99. ]
  100. },
  101. {
  102. "metadata": {
  103. "trusted": true,
  104. "input_collapsed": false,
  105. "collapsed": false,
  106. "id": "3DBA2B7EB1124740A16DADA6755B69B6"
  107. },
  108. "cell_type": "code",
  109. "source": "@transient val ssc = new StreamingContext(sparkContext, Seconds(5))",
  110. "outputs": [
  111. {
  112. "name": "stdout",
  113. "output_type": "stream",
  114. "text": "ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@5db87859\n"
  115. },
  116. {
  117. "metadata": {},
  118. "data": {
  119. "text/html": ""
  120. },
  121. "output_type": "execute_result",
  122. "execution_count": 4,
  123. "time": "Took: 0.733s, at 2017-07-27 16:16"
  124. }
  125. ]
  126. },
  127. {
  128. "metadata": {
  129. "trusted": true,
  130. "input_collapsed": false,
  131. "collapsed": false,
  132. "id": "59152C091B524251A6741AECC244CCA5"
  133. },
  134. "cell_type": "code",
  135. "source": "val queue = scala.collection.mutable.Queue(inputRDDs:_*)",
  136. "outputs": [
  137. {
  138. "name": "stdout",
  139. "output_type": "stream",
  140. "text": "queue: scala.collection.mutable.Queue[org.apache.spark.rdd.RDD[String]] = Queue(ParallelCollectionRDD[0] at parallelize at <console>:72, ParallelCollectionRDD[1] at parallelize at <console>:72, ParallelCollectionRDD[2] at parallelize at <console>:72, ParallelCollectionRDD[3] at parallelize at <console>:72, ParallelCollectionRDD[4] at parallelize at <console>:72, ParallelCollectionRDD[5] at parallelize at <console>:72)\n"
  141. },
  142. {
  143. "metadata": {},
  144. "data": {
  145. "text/html": ""
  146. },
  147. "output_type": "execute_result",
  148. "execution_count": 5,
  149. "time": "Took: 0.711s, at 2017-07-27 16:16"
  150. }
  151. ]
  152. },
  153. {
  154. "metadata": {
  155. "trusted": true,
  156. "input_collapsed": false,
  157. "collapsed": false,
  158. "id": "C51FDEBB5F9043D8A0D045F2B0BE2A9E"
  159. },
  160. "cell_type": "code",
  161. "source": "// This is for test purposes only. Replace with actual stream source.\n@transient val queueDStream = ssc.queueStream(queue, oneAtATime = true)",
  162. "outputs": [
  163. {
  164. "name": "stdout",
  165. "output_type": "stream",
  166. "text": "queueDStream: org.apache.spark.streaming.dstream.InputDStream[String] = org.apache.spark.streaming.dstream.QueueInputDStream@6b7cdd97\n"
  167. },
  168. {
  169. "metadata": {},
  170. "data": {
  171. "text/html": ""
  172. },
  173. "output_type": "execute_result",
  174. "execution_count": 6,
  175. "time": "Took: 0.716s, at 2017-07-27 16:16"
  176. }
  177. ]
  178. },
  179. {
  180. "metadata": {
  181. "trusted": true,
  182. "input_collapsed": false,
  183. "collapsed": false,
  184. "id": "147F4ABC9B834C768197D79C303F26C0"
  185. },
  186. "cell_type": "code",
  187. "source": "// this is our similarity function. Replace with something appropriate.\n// We're using the function notation instead of a def, b/c it's cleaner for the serialization.\nval isSimilar: Int => Int => Boolean = event1 => event2 => Math.abs(event2 - event1).toInt == 1 \n\n// Global Id Generator. Should generate unique ids each time - replace accordingly\nval genGlobalId: () => String = () => \"gen-\" + scala.util.Random.nextInt(10000)",
  188. "outputs": [
  189. {
  190. "name": "stdout",
  191. "output_type": "stream",
  192. "text": "isSimilar: Int => (Int => Boolean) = <function1>\ngenGlobalId: () => String = <function0>\n"
  193. },
  194. {
  195. "metadata": {},
  196. "data": {
  197. "text/html": ""
  198. },
  199. "output_type": "execute_result",
  200. "execution_count": 7,
  201. "time": "Took: 0.855s, at 2017-07-27 16:16"
  202. }
  203. ]
  204. },
  205. {
  206. "metadata": {
  207. "trusted": true,
  208. "input_collapsed": false,
  209. "collapsed": false,
  210. "id": "BD368F7AC4C645568923DB4AD92372F9"
  211. },
  212. "cell_type": "code",
  213. "source": "// Here we have our initial Event stream\n@transient val eventStream = queueDStream.map{entry => \n val Array(id, payload) = entry.split(\"\\\\|\")\n Event(id.toInt, payload)\n}",
  214. "outputs": [
  215. {
  216. "name": "stdout",
  217. "output_type": "stream",
  218. "text": "eventStream: org.apache.spark.streaming.dstream.DStream[Event] = org.apache.spark.streaming.dstream.MappedDStream@56ab87b6\n"
  219. },
  220. {
  221. "metadata": {},
  222. "data": {
  223. "text/html": ""
  224. },
  225. "output_type": "execute_result",
  226. "execution_count": 8,
  227. "time": "Took: 0.868s, at 2017-07-27 16:16"
  228. }
  229. ]
  230. },
  231. {
  232. "metadata": {
  233. "trusted": true,
  234. "input_collapsed": false,
  235. "collapsed": false,
  236. "id": "3C96904AD563474CA8B4185F6B5B8A49"
  237. },
  238. "cell_type": "code",
  239. "source": "@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD",
  240. "outputs": [
  241. {
  242. "name": "stdout",
  243. "output_type": "stream",
  244. "text": "states: org.apache.spark.rdd.RDD[(String, (Int, Long))] = EmptyRDD[7] at emptyRDD at <console>:71\n"
  245. },
  246. {
  247. "metadata": {},
  248. "data": {
  249. "text/html": ""
  250. },
  251. "output_type": "execute_result",
  252. "execution_count": 9,
  253. "time": "Took: 0.690s, at 2017-07-27 16:16"
  254. }
  255. ]
  256. },
  257. {
  258. "metadata": {
  259. "trusted": true,
  260. "input_collapsed": false,
  261. "collapsed": false,
  262. "id": "5BF0201296CF47538F052BED61EF0509"
  263. },
  264. "cell_type": "code",
  265. "source": "@transient var currentState: RDD[(String, (Int, Long))] = sparkContext.emptyRDD",
  266. "outputs": [
  267. {
  268. "name": "stdout",
  269. "output_type": "stream",
  270. "text": "currentState: org.apache.spark.rdd.RDD[(String, (Int, Long))] = EmptyRDD[8] at emptyRDD at <console>:71\n"
  271. },
  272. {
  273. "metadata": {},
  274. "data": {
  275. "text/html": ""
  276. },
  277. "output_type": "execute_result",
  278. "execution_count": 10,
  279. "time": "Took: 0.675s, at 2017-07-27 16:16"
  280. }
  281. ]
  282. },
  283. {
  284. "metadata": {
  285. "trusted": true,
  286. "input_collapsed": false,
  287. "collapsed": false,
  288. "id": "5EC3D2A2AC2D45C2831C88AA68992DA7"
  289. },
  290. "cell_type": "code",
  291. "source": "@transient val eventsById = eventStream.map(event => (event.id, event))\n@transient val groupedEvents = eventsById.groupByKey()",
  292. "outputs": [
  293. {
  294. "name": "stdout",
  295. "output_type": "stream",
  296. "text": "eventsById: org.apache.spark.streaming.dstream.DStream[(Int, Event)] = org.apache.spark.streaming.dstream.MappedDStream@5ab031f5\ngroupedEvents: org.apache.spark.streaming.dstream.DStream[(Int, Iterable[Event])] = org.apache.spark.streaming.dstream.ShuffledDStream@2367d0e2\n"
  297. },
  298. {
  299. "metadata": {},
  300. "data": {
  301. "text/html": ""
  302. },
  303. "output_type": "execute_result",
  304. "execution_count": 11,
  305. "time": "Took: 0.751s, at 2017-07-27 16:16"
  306. }
  307. ]
  308. },
  309. {
  310. "metadata": {
  311. "trusted": true,
  312. "input_collapsed": false,
  313. "collapsed": false,
  314. "id": "95C4DDBD66A6419C8E6C7EFE7334B056"
  315. },
  316. "cell_type": "code",
  317. "source": "@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) => \n val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}} \n val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)}\n \n val newEventIds = events.keys // let's extract the ids of the incoming (grouped) events\n val similarityJoinMap = newEventIds.cartesian(currentMappings)\n .collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}\n .collectAsMap\n //val similarityBC = sparkContext.broadcast(similarityJoinMap) \n val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId())))\n newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids\n \n val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) => \n events.map(event => (event.id,event.payload, globalKey))\n }\n val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}\n currentState = newStates \n states.unpersist(false) \n states = newStates.union(states)\n states.cache() \n newTaggedEvents\n }",
  318. "outputs": [
  319. {
  320. "name": "stdout",
  321. "output_type": "stream",
  322. "text": "taggedEvents: org.apache.spark.streaming.dstream.DStream[(Int, String, String)] = org.apache.spark.streaming.dstream.TransformedDStream@6a596725\n"
  323. },
  324. {
  325. "metadata": {},
  326. "data": {
  327. "text/html": ""
  328. },
  329. "output_type": "execute_result",
  330. "execution_count": 12,
  331. "time": "Took: 0.971s, at 2017-07-27 16:16"
  332. }
  333. ]
  334. },
  335. {
  336. "metadata": {
  337. "trusted": true,
  338. "input_collapsed": false,
  339. "collapsed": false,
  340. "id": "EE22776EA26647BFA2DF78B80B444A10"
  341. },
  342. "cell_type": "code",
  343. "source": "@transient val rawEventBox = ul(20)\nrawEventBox",
  344. "outputs": [
  345. {
  346. "name": "stdout",
  347. "output_type": "stream",
  348. "text": "rawEventBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres15: notebook.front.widgets.HtmlList = <HtmlList widget>\n"
  349. },
  350. {
  351. "metadata": {},
  352. "data": {
  353. "text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anon66a44b169710c8728ef4ba67ac6a09c7"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
  354. },
  355. "output_type": "execute_result",
  356. "execution_count": 14,
  357. "time": "Took: 0.863s, at 2017-07-27 16:16"
  358. }
  359. ]
  360. },
  361. {
  362. "metadata": {
  363. "trusted": true,
  364. "input_collapsed": false,
  365. "collapsed": false,
  366. "id": "3381C07E0EAE430E90362A9733923446"
  367. },
  368. "cell_type": "code",
  369. "source": "eventStream.foreachRDD(e => rawEventBox.append(e.collect.mkString(\", \")))",
  370. "outputs": [
  371. {
  372. "metadata": {},
  373. "data": {
  374. "text/html": ""
  375. },
  376. "output_type": "execute_result",
  377. "execution_count": 15,
  378. "time": "Took: 0.785s, at 2017-07-27 16:16"
  379. }
  380. ]
  381. },
  382. {
  383. "metadata": {
  384. "trusted": true,
  385. "input_collapsed": false,
  386. "collapsed": false,
  387. "id": "FE764B4A05EC481C8EA40A88ED6D473B"
  388. },
  389. "cell_type": "code",
  390. "source": "@transient val currentTrans = ul(20)\ncurrentTrans",
  391. "outputs": [
  392. {
  393. "name": "stdout",
  394. "output_type": "stream",
  395. "text": "currentTrans: notebook.front.widgets.HtmlList = <HtmlList widget>\nres19: notebook.front.widgets.HtmlList = <HtmlList widget>\n"
  396. },
  397. {
  398. "metadata": {},
  399. "data": {
  400. "text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anonfc521a3a25c9a0a5958acd946894c1a9"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
  401. },
  402. "output_type": "execute_result",
  403. "execution_count": 16,
  404. "time": "Took: 0.725s, at 2017-07-27 16:16"
  405. }
  406. ]
  407. },
  408. {
  409. "metadata": {
  410. "trusted": true,
  411. "input_collapsed": false,
  412. "collapsed": false,
  413. "id": "80A8C316BEBB440E899157A51627AE1F"
  414. },
  415. "cell_type": "code",
  416. "source": "@transient val eventBox = ul(20)\neventBox",
  417. "outputs": [
  418. {
  419. "name": "stdout",
  420. "output_type": "stream",
  421. "text": "eventBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres21: notebook.front.widgets.HtmlList = <HtmlList widget>\n"
  422. },
  423. {
  424. "metadata": {},
  425. "data": {
  426. "text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anona38d37904c4f09c626ff390359d2c8af"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
  427. },
  428. "output_type": "execute_result",
  429. "execution_count": 17,
  430. "time": "Took: 0.650s, at 2017-07-27 16:16"
  431. }
  432. ]
  433. },
  434. {
  435. "metadata": {
  436. "trusted": true,
  437. "input_collapsed": false,
  438. "collapsed": false,
  439. "id": "6056967780B14BFC8BE5EBC3635362E5"
  440. },
  441. "cell_type": "code",
  442. "source": "@transient val transitionChainBox = ul(20)\ntransitionChainBox",
  443. "outputs": [
  444. {
  445. "name": "stdout",
  446. "output_type": "stream",
  447. "text": "transitionChainBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres23: notebook.front.widgets.HtmlList = <HtmlList widget>\n"
  448. },
  449. {
  450. "metadata": {},
  451. "data": {
  452. "text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anon1022644358033f118f633706870a441a"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
  453. },
  454. "output_type": "execute_result",
  455. "execution_count": 18,
  456. "time": "Took: 0.667s, at 2017-07-27 16:16"
  457. }
  458. ]
  459. },
  460. {
  461. "metadata": {
  462. "trusted": true,
  463. "input_collapsed": false,
  464. "collapsed": false,
  465. "id": "48D1F71E85F04A63B0F9026622D7E78E"
  466. },
  467. "cell_type": "code",
  468. "source": "taggedEvents.foreachRDD{events => \n eventBox.append(\"---\")\n eventBox.append(events.collect.map{case (id, payload, globalKey) => s\"$id|$payload: $globalKey\"}.mkString(\",\"))\n val transitions = states.groupByKey.mapValues(eventSeq => eventSeq.toList.sortBy{case (id, ts) => ts}.map{case (id, ts) => id}.mkString (\"<-\"))\n transitionChainBox.append(\"---\")\n transitions.collect.map{case (globalId, eventSeq) => s\"$globalId: $eventSeq\"}.foreach(s => transitionChainBox.append(s))\n \n currentTrans.append(currentState.collect.map(_.toString).mkString(\",\"))\n \n }",
  469. "outputs": [
  470. {
  471. "metadata": {},
  472. "data": {
  473. "text/html": ""
  474. },
  475. "output_type": "execute_result",
  476. "execution_count": 19,
  477. "time": "Took: 0.924s, at 2017-07-27 16:16"
  478. }
  479. ]
  480. },
  481. {
  482. "metadata": {
  483. "trusted": true,
  484. "input_collapsed": false,
  485. "collapsed": false,
  486. "id": "C19D7885F7B2437EB9B89D69462D35F9"
  487. },
  488. "cell_type": "code",
  489. "source": "ssc.start()",
  490. "outputs": [
  491. {
  492. "metadata": {},
  493. "data": {
  494. "text/html": ""
  495. },
  496. "output_type": "execute_result",
  497. "execution_count": 20,
  498. "time": "Took: 0.627s, at 2017-07-27 16:16"
  499. }
  500. ]
  501. },
  502. {
  503. "metadata": {
  504. "trusted": true,
  505. "input_collapsed": false,
  506. "collapsed": false,
  507. "id": "054D3411347247539EBC9AB16488F987"
  508. },
  509. "cell_type": "code",
  510. "source": "ssc.stop(false)",
  511. "outputs": [
  512. {
  513. "metadata": {},
  514. "data": {
  515. "text/html": ""
  516. },
  517. "output_type": "execute_result",
  518. "execution_count": 21,
  519. "time": "Took: 0.766s, at 2017-07-27 16:17"
  520. }
  521. ]
  522. },
  523. {
  524. "metadata": {
  525. "trusted": true,
  526. "input_collapsed": false,
  527. "collapsed": true,
  528. "id": "0342E90E4BE949748B7D837F71336974"
  529. },
  530. "cell_type": "code",
  531. "source": "",
  532. "outputs": []
  533. }
  534. ],
  535. "nbformat": 4
  536. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement