Advertisement
Guest User

Untitled

a guest
Nov 30th, 2015
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 16.46 KB | None | 0 0
  1. {
  2. "cells": [
  3. {
  4. "cell_type": "code",
  5. "execution_count": 1,
  6. "metadata": {
  7. "collapsed": false
  8. },
  9. "outputs": [
  10. {
  11. "name": "stdout",
  12. "output_type": "stream",
  13. "text": [
  14. "GAO boyuan ran all at 2015-11-29 08:24:44.054634\n"
  15. ]
  16. }
  17. ],
  18. "source": [
  19. "student_name = \"GAO boyuan\" # Chris Boesch\n",
  20. "\n",
  21. "\"\"\"\n",
  22. "Update the student_name variable above with your name. \n",
  23. "Extend this notebook to solve the problems at the bottom. \n",
  24. "From the menue Cell -> Run All\n",
  25. "Save your notebook. \n",
  26. "Download your notebook. \n",
  27. "Create a Github Gist of your notebook. \n",
  28. "Submit a link to your gist in ClassMentors. \n",
  29. "\n",
  30. "\"\"\"\n",
  31. "\n",
  32. "import datetime\n",
  33. "now = datetime.datetime.now()\n",
  34. "\n",
  35. "message = \"{} ran all at {}\".format(student_name, now)\n",
  36. "print(message)"
  37. ]
  38. },
  39. {
  40. "cell_type": "code",
  41. "execution_count": 2,
  42. "metadata": {
  43. "collapsed": false
  44. },
  45. "outputs": [],
  46. "source": [
  47. "from pyspark import SparkContext, SparkConf\n",
  48. "conf = SparkConf()\n",
  49. "sc = SparkContext(conf=conf)\n"
  50. ]
  51. },
  52. {
  53. "cell_type": "markdown",
  54. "metadata": {},
  55. "source": [
  56. "\n",
  57. "RDD's are immutable. You can't change them once they are created. \n",
  58. "\n",
  59. "You can transform them and create new RDD's, but you can't change the original.\n",
  60. "\n",
  61. "This property enables Spark to partition the data and move partitions around. \n",
  62. "\n",
  63. "Programmers can define how many many partitions there should be to an RDD. \n",
  64. "\n",
  65. "If you do not provide a value, one is chosen automatically. \n",
  66. "\n",
  67. "You transform RDDs to others RDDs. \n"
  68. ]
  69. },
  70. {
  71. "cell_type": "code",
  72. "execution_count": 3,
  73. "metadata": {
  74. "collapsed": false
  75. },
  76. "outputs": [
  77. {
  78. "name": "stdout",
  79. "output_type": "stream",
  80. "text": [
  81. "[1, 2, 3, 4, 5]\n",
  82. "120\n",
  83. "[2, 4, 6, 8, 10]\n",
  84. "[2, 4]\n",
  85. "[1, 1, 1, 2, 4, 8, 3, 9, 27, 4, 16, 64, 5, 25, 125]\n",
  86. "[6, 5]\n"
  87. ]
  88. }
  89. ],
  90. "source": [
  91. "rdd = sc.parallelize([1,2,3,4,5])\n",
  92. "\n",
  93. "#reduce(a,b)\n",
  94. "#take(n)\n",
  95. "#collect()\n",
  96. "#takeOrdered(n,func())\n",
  97. "\n",
  98. "# Make sure that the resulting list will fit into memory.\n",
  99. "r0 = rdd.collect()\n",
  100. "print(r0)\n",
  101. "\n",
  102. "# The lambda code gets passed to the workers. \n",
  103. "#Reduce is a terminal function\n",
  104. "r1 = rdd.reduce(lambda x, y: x*y)\n",
  105. "print(r1)\n",
  106. "\n",
  107. "\n",
  108. "r2 = rdd.map(lambda x: x*2).collect()\n",
  109. "\n",
  110. "print(r2)\n",
  111. "\n",
  112. "r3 = rdd.filter(lambda x: x%2==0).take(2)\n",
  113. "print(r3)\n",
  114. "\n",
  115. "rdd2 = sc.parallelize([2,2,1,1,1,1,2,2,3,3,3])\n",
  116. "r4 = rdd2.distinct().takeOrdered(2)\n",
  117. "\n",
  118. "r5 = rdd.flatMap(lambda x: [x,x*x, x*x*x]).collect()\n",
  119. "print(r5)\n",
  120. "\n",
  121. "rdd3 = sc.parallelize([5,2,3,4,1,6])\n",
  122. "# takeOrdered() allows you to pass a function to define the sort value.\n",
  123. "r6 = rdd3.takeOrdered(2, lambda x: -1*x)\n",
  124. "print(r6)"
  125. ]
  126. },
  127. {
  128. "cell_type": "code",
  129. "execution_count": 4,
  130. "metadata": {
  131. "collapsed": true
  132. },
  133. "outputs": [],
  134. "source": [
  135. "# We will create a local text file for testing.\n",
  136. "data_file = open(\"data.txt\", \"w\")\n",
  137. "data_file.write(\"this is a test\\n\")\n",
  138. "data_file.write(\"to see what happens\\n\")\n",
  139. "data_file.write(\"with text data\\n\")\n",
  140. "data_file.close()\n"
  141. ]
  142. },
  143. {
  144. "cell_type": "code",
  145. "execution_count": 5,
  146. "metadata": {
  147. "collapsed": false
  148. },
  149. "outputs": [
  150. {
  151. "name": "stdout",
  152. "output_type": "stream",
  153. "text": [
  154. "3\n",
  155. "\n",
  156. "['this is a test', 'to see what happens', 'with text data']\n",
  157. "\n",
  158. "[['this', 'is', 'a', 'test'], ['to', 'see', 'what', 'happens'], ['with', 'text', 'data']]\n",
  159. "\n",
  160. "['this', 'is', 'a', 'test', 'to', 'see', 'what', 'happens', 'with', 'text', 'data']\n"
  161. ]
  162. }
  163. ],
  164. "source": [
  165. "# textFile() will load text data as one string per line of data. \n",
  166. "data = sc.textFile(\"data.txt\")\n",
  167. "print(data.count())\n",
  168. "print(\"\")\n",
  169. "print(data.collect())\n",
  170. "print(\"\")\n",
  171. "\n",
  172. "# YOu can split the lines by any separator. \n",
  173. "words = data.map(lambda line: line.split(\" \")).collect()\n",
  174. "print(words)\n",
  175. "print(\"\")\n",
  176. "\n",
  177. "flat_words = data.flatMap(lambda line: line.split(\" \")) \\\n",
  178. " .collect()\n",
  179. " \n",
  180. "print(flat_words)\n",
  181. "\n"
  182. ]
  183. },
  184. {
  185. "cell_type": "code",
  186. "execution_count": 6,
  187. "metadata": {
  188. "collapsed": false
  189. },
  190. "outputs": [
  191. {
  192. "name": "stdout",
  193. "output_type": "stream",
  194. "text": [
  195. "The line count was 3 and the wordcount was 11.\n"
  196. ]
  197. }
  198. ],
  199. "source": [
  200. "# You need to cache if you want to avoid duplicate loading. \n",
  201. "data = sc.textFile(\"data.txt\")\n",
  202. "#data.cache()\n",
  203. "\n",
  204. "# Load data from file. \n",
  205. "linecount = data.count()\n",
  206. "# Load data from file again if not cached. \n",
  207. "wordcount = data.flatMap(lambda line: line.split(\" \")) \\\n",
  208. " .count()\n",
  209. "\n",
  210. "print(\"The line count was {} and the wordcount was {}.\" \\\n",
  211. " .format(linecount, wordcount))\n"
  212. ]
  213. },
  214. {
  215. "cell_type": "code",
  216. "execution_count": 7,
  217. "metadata": {
  218. "collapsed": false
  219. },
  220. "outputs": [
  221. {
  222. "data": {
  223. "text/plain": [
  224. "[(1, 10), (1, 30), (3, 20)]"
  225. ]
  226. },
  227. "execution_count": 7,
  228. "metadata": {},
  229. "output_type": "execute_result"
  230. }
  231. ],
  232. "source": [
  233. "\"\"\"\n",
  234. "reduceByKey(func)\n",
  235. "sortByKey()\n",
  236. "groupByKey()\n",
  237. "\n",
  238. "\"\"\"\n",
  239. "rdd = sc.parallelize([(1,10), (3,20),(1,30) ])\n",
  240. "rdd.sortByKey().collect()"
  241. ]
  242. },
  243. {
  244. "cell_type": "code",
  245. "execution_count": 8,
  246. "metadata": {
  247. "collapsed": false
  248. },
  249. "outputs": [
  250. {
  251. "data": {
  252. "text/plain": [
  253. "[(1, <pyspark.resultiterable.ResultIterable at 0x7f362aa1af98>),\n",
  254. " (3, <pyspark.resultiterable.ResultIterable at 0x7f362aa1aef0>)]"
  255. ]
  256. },
  257. "execution_count": 8,
  258. "metadata": {},
  259. "output_type": "execute_result"
  260. }
  261. ],
  262. "source": [
  263. "# groupByKey()\n",
  264. "rdd = sc.parallelize([(1,10), (3,20),(1,30) ])\n",
  265. "# This can cause problems if all the values for a key are large. \n",
  266. "rdd.groupByKey().collect()"
  267. ]
  268. },
  269. {
  270. "cell_type": "code",
  271. "execution_count": 9,
  272. "metadata": {
  273. "collapsed": false
  274. },
  275. "outputs": [
  276. {
  277. "data": {
  278. "text/plain": [
  279. "[(1, 40), (3, 20)]"
  280. ]
  281. },
  282. "execution_count": 9,
  283. "metadata": {},
  284. "output_type": "execute_result"
  285. }
  286. ],
  287. "source": [
  288. "# reduceByKey()\n",
  289. "rdd = sc.parallelize([(1,10), (3,20),(1,30) ])\n",
  290. "rdd.reduceByKey(lambda x,y: x + y).collect()"
  291. ]
  292. },
  293. {
  294. "cell_type": "code",
  295. "execution_count": 10,
  296. "metadata": {
  297. "collapsed": false
  298. },
  299. "outputs": [
  300. {
  301. "name": "stdout",
  302. "output_type": "stream",
  303. "text": [
  304. "There were 2 Clubs.\n",
  305. "There were 3 Hearts.\n"
  306. ]
  307. }
  308. ],
  309. "source": [
  310. "# Count how many of each card type were present. \n",
  311. "rdd = sc.parallelize([(\"Hearts\",2), \n",
  312. " (\"Clubs\", 8),\n",
  313. " (\"Hearts\",3),\n",
  314. " (\"Hearts\",5),\n",
  315. " (\"Clubs\", 4) ])\n",
  316. "result = rdd.map(lambda x: (x[0], 1)) \\\n",
  317. " .reduceByKey(lambda x,y: x+y) \\\n",
  318. " .collect()\n",
  319. " \n",
  320. "for item in result: \n",
  321. " message = \"There were {} {}.\".format(item[1], item[0])\n",
  322. " print(message)"
  323. ]
  324. },
  325. {
  326. "cell_type": "code",
  327. "execution_count": 11,
  328. "metadata": {
  329. "collapsed": false
  330. },
  331. "outputs": [
  332. {
  333. "name": "stdout",
  334. "output_type": "stream",
  335. "text": [
  336. "The Clubs found were [8, 4]\n",
  337. "The Hearts found were [2, 3, 5]\n"
  338. ]
  339. }
  340. ],
  341. "source": [
  342. "rdd = sc.parallelize([(\"Hearts\",2), \n",
  343. " (\"Clubs\", 8),\n",
  344. " (\"Hearts\",3),\n",
  345. " (\"Hearts\",5),\n",
  346. " (\"Clubs\", 4) ])\n",
  347. "# Put each element in an list so that you can add the lists together. \n",
  348. "#[1] + [2,3] = [1,2,3]\n",
  349. "result = rdd.map(lambda x: (x[0], [x[1]])) \\\n",
  350. " .reduceByKey(lambda x,y: x + y) \\\n",
  351. " .collect()\n",
  352. " \n",
  353. "for item in result: \n",
  354. " message = \"The {} found were {}\".format(item[0], item[1])\n",
  355. " print(message)\n",
  356. " \n"
  357. ]
  358. },
  359. {
  360. "cell_type": "code",
  361. "execution_count": 12,
  362. "metadata": {
  363. "collapsed": false
  364. },
  365. "outputs": [
  366. {
  367. "name": "stdout",
  368. "output_type": "stream",
  369. "text": [
  370. "The Clubs found were [8, 4]\n",
  371. "The Hearts found were [2, 3, 5]\n"
  372. ]
  373. }
  374. ],
  375. "source": [
  376. "# Sometime develoers will wrap expression in ( ) rather than use / between line breaks. \n",
  377. "# Either approach will work depending on which you find more readable. \n",
  378. "\n",
  379. "result = (rdd.map(lambda x: (x[0], [x[1]]))\n",
  380. " .reduceByKey(lambda x,y: x + y) \n",
  381. " .collect())\n",
  382. " \n",
  383. "for item in result: \n",
  384. " message = \"The {} found were {}\".format(item[0], item[1])\n",
  385. " print(message)"
  386. ]
  387. },
  388. {
  389. "cell_type": "code",
  390. "execution_count": 13,
  391. "metadata": {
  392. "collapsed": false
  393. },
  394. "outputs": [
  395. {
  396. "name": "stdout",
  397. "output_type": "stream",
  398. "text": [
  399. "The rdd has 5 partitions.\n",
  400. "The rdd has 24 partitions.\n"
  401. ]
  402. }
  403. ],
  404. "source": [
  405. "# You can define the number of partitions in an RDD\n",
  406. "\n",
  407. "# Reminder: range(4) -> [0,1,2,3]\n",
  408. "numPartitions = 5\n",
  409. "rdd = sc.parallelize(range(100), numPartitions )\n",
  410. "print( \"The rdd has {} partitions.\".format(rdd.getNumPartitions()))\n",
  411. "\n",
  412. "# If you don't pass the number of partitions, Spark will pick it for you. \n",
  413. "rdd = sc.parallelize(range(100))\n",
  414. "print( \"The rdd has {} partitions.\".format(rdd.getNumPartitions()))\n"
  415. ]
  416. },
  417. {
  418. "cell_type": "code",
  419. "execution_count": 14,
  420. "metadata": {
  421. "collapsed": false
  422. },
  423. "outputs": [
  424. {
  425. "name": "stdout",
  426. "output_type": "stream",
  427. "text": [
  428. "4\n",
  429. "[['1', '2', '3'], ['4', '5', '6'], ['7', '8', '9'], ['4', '11', '12']]\n"
  430. ]
  431. }
  432. ],
  433. "source": [
  434. "# Here is a csv data example. \n",
  435. "data_file = open(\"data2.csv\", \"w\")\n",
  436. "data_file.write(\"1,2,3\\n\")\n",
  437. "data_file.write(\"4,5,6\\n\")\n",
  438. "data_file.write(\"7,8,9\\n\")\n",
  439. "data_file.write(\"4,11,12\\n\")\n",
  440. "data_file.close()\n",
  441. "\n",
  442. "csvdata = sc.textFile(\"data2.csv\")\n",
  443. "\n",
  444. "# Load data from file. \n",
  445. "linecount = csvdata.count()\n",
  446. "print(linecount)\n",
  447. "\n",
  448. "# Turn the list of strings into a list of lists\n",
  449. "rows = csvdata.map(lambda line: line.split(\",\"))\n",
  450. "\n",
  451. "print(rows.collect())\n",
  452. "\n"
  453. ]
  454. },
  455. {
  456. "cell_type": "code",
  457. "execution_count": 15,
  458. "metadata": {
  459. "collapsed": false
  460. },
  461. "outputs": [
  462. {
  463. "name": "stdout",
  464. "output_type": "stream",
  465. "text": [
  466. "26\n"
  467. ]
  468. }
  469. ],
  470. "source": [
  471. "# Once we have the rdd in a list of rows format, we can do additional operations. \n",
  472. "\n",
  473. "# Add up the second values in each row after converting them to integers. \n",
  474. "result = (rows.map(lambda x: x[1]) \n",
  475. " .map(lambda x: int(x)) \n",
  476. " .reduce(lambda x, y: x+y) )\n",
  477. "\n",
  478. "print(result)"
  479. ]
  480. },
  481. {
  482. "cell_type": "code",
  483. "execution_count": 16,
  484. "metadata": {
  485. "collapsed": false
  486. },
  487. "outputs": [
  488. {
  489. "name": "stdout",
  490. "output_type": "stream",
  491. "text": [
  492. "[('1', 5), ('4', 11), ('7', 17), ('4', 23)]\n"
  493. ]
  494. }
  495. ],
  496. "source": [
  497. "# Return the sum of the 2nd and 3rd value using the first value as a key\n",
  498. "result = (rows.map(lambda x: (x[0], int(x[1]) + int(x[2]) ) ) \n",
  499. " .collect() )\n",
  500. "print(result)"
  501. ]
  502. },
  503. {
  504. "cell_type": "code",
  505. "execution_count": 17,
  506. "metadata": {
  507. "collapsed": false
  508. },
  509. "outputs": [
  510. {
  511. "name": "stdout",
  512. "output_type": "stream",
  513. "text": [
  514. "[('1', 5), ('4', 34), ('7', 17)]\n"
  515. ]
  516. }
  517. ],
  518. "source": [
  519. "# Repeat but group by key summing the values for rows with the same 1st value. \n",
  520. "result = (rows.map(lambda x: (x[0], int(x[1]) + int(x[2]) ) ) \n",
  521. " .reduceByKey(lambda x, y: x+y) \n",
  522. " .sortByKey()\n",
  523. " .collect())\n",
  524. "print(result)"
  525. ]
  526. },
  527. {
  528. "cell_type": "code",
  529. "execution_count": 18,
  530. "metadata": {
  531. "collapsed": false
  532. },
  533. "outputs": [
  534. {
  535. "name": "stdout",
  536. "output_type": "stream",
  537. "text": [
  538. "['Bob,5,2,3', 'Kevin,8,5,9', 'Bob,5,6,7', 'Stuart,4,5,6', 'Kevin,1,5,5']\n"
  539. ]
  540. }
  541. ],
  542. "source": [
  543. "\"\"\"\n",
  544. "Given this csv data, use Spark to answer the following questions. \n",
  545. "Write your code so that it would scale and work for ten million rows as well as for thew few rows provided. \n",
  546. "\n",
  547. "Gru has been assigning his minions to help deliver packages during the holidays. \n",
  548. "Gru has a csv file containing millions of records of minion deliveries. \n",
  549. "\n",
  550. "The csv data is in this format. \n",
  551. "minion, quantity, kilometers, minutes\n",
  552. "\n",
  553. "\n",
  554. "The first column contains the unique name of the minion. \n",
  555. "The second column contains the number of packages delivered on a trip. \n",
  556. "The third column contains the distance traveled on a delivery trip. \n",
  557. "The fourth column contains the time in minutes needed to complete the trip. \n",
  558. "\n",
  559. "\"\"\"\n",
  560. "rawdata = \"\"\"Bob,5,2,3\n",
  561. "Kevin,8,5,9\n",
  562. "Bob,5,6,7\n",
  563. "Stuart,4,5,6\n",
  564. "Kevin,1,5,5\n",
  565. "\"\"\"\n",
  566. "data_file = open(\"data3.csv\", \"w\")\n",
  567. "data_file.write(rawdata)\n",
  568. "data_file.close()\n",
  569. "\n",
  570. "# Load data from file. \n",
  571. "data = sc.textFile(\"data3.csv\")\n",
  572. "print(data.collect())\n"
  573. ]
  574. },
  575. {
  576. "cell_type": "code",
  577. "execution_count": 74,
  578. "metadata": {
  579. "collapsed": false
  580. },
  581. "outputs": [
  582. {
  583. "name": "stdout",
  584. "output_type": "stream",
  585. "text": [
  586. "Q1 = 5\n",
  587. "Q2 = [('Bob', 8), ('Kevin', 10), ('Stuart', 5)]\n",
  588. "Q3 = 23\n",
  589. "Q4 = \n",
  590. "Q5 = \n"
  591. ]
  592. }
  593. ],
  594. "source": [
  595. "\"\"\"\n",
  596. "Write the spark code to answer the following questions using the data RDD defined above. \n",
  597. "\n",
  598. "Q1 - How many minions are there records for?\n",
  599. "Q2 - How many kilometers did each minon travel in total? \n",
  600. "Q3 - How many total kilometers were travelled by all minions overall? \n",
  601. "Q4 - What was the total kilometers traveled and total minutes traveled for each minion? \n",
  602. "Q5 - How many items were delivered per minute by each minion? \n",
  603. "\n",
  604. "\"\"\"\n",
  605. "\n",
  606. "q1 = data.count()\n",
  607. "\n",
  608. "\n",
  609. "\n",
  610. "data1 = data.map(lambda line: line.split(\",\"))\n",
  611. "result = (data1.map(lambda x: (x[0], int(x[2]) ) ) \n",
  612. " .reduceByKey(lambda x, y: x+y) \n",
  613. " .sortByKey()\n",
  614. " .collect())\n",
  615. "\n",
  616. "q2 = result\n",
  617. "\n",
  618. "result2 = (data1.map(lambda x: x[2]) .map(lambda x: int(x)) .reduce(lambda x, y: x+y) )\n",
  619. "\n",
  620. "q3 = result2\n",
  621. "\n",
  622. "\n",
  623. "\n",
  624. "\n",
  625. "q4 = \"\"\n",
  626. "q5 = \"\"\n",
  627. "\n",
  628. "print(\"Q1 = {}\".format(q1))\n",
  629. "print(\"Q2 = {}\".format(q2))\n",
  630. "print(\"Q3 = {}\".format(q3))\n",
  631. "print(\"Q4 = {}\".format(q4))\n",
  632. "print(\"Q5 = {}\".format(q5))\n"
  633. ]
  634. },
  635. {
  636. "cell_type": "markdown",
  637. "metadata": {},
  638. "source": []
  639. }
  640. ],
  641. "metadata": {
  642. "kernelspec": {
  643. "display_name": "Python 3",
  644. "language": "python",
  645. "name": "python3"
  646. },
  647. "language_info": {
  648. "codemirror_mode": {
  649. "name": "ipython",
  650. "version": 3
  651. },
  652. "file_extension": ".py",
  653. "mimetype": "text/x-python",
  654. "name": "python",
  655. "nbconvert_exporter": "python",
  656. "pygments_lexer": "ipython3",
  657. "version": "3.4.3"
  658. }
  659. },
  660. "nbformat": 4,
  661. "nbformat_minor": 0
  662. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement