Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- {
- "cells": [
- {
- "cell_type": "code",
- "execution_count": 1,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "GAO boyuan ran all at 2015-11-29 08:24:44.054634\n"
- ]
- }
- ],
- "source": [
- "student_name = \"GAO boyuan\" # Chris Boesch\n",
- "\n",
- "\"\"\"\n",
- "Update the student_name variable above with your name. \n",
- "Extend this notebook to solve the problems at the bottom. \n",
- "From the menue Cell -> Run All\n",
- "Save your notebook. \n",
- "Download your notebook. \n",
- "Create a Github Gist of your notebook. \n",
- "Submit a link to your gist in ClassMentors. \n",
- "\n",
- "\"\"\"\n",
- "\n",
- "import datetime\n",
- "now = datetime.datetime.now()\n",
- "\n",
- "message = \"{} ran all at {}\".format(student_name, now)\n",
- "print(message)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 2,
- "metadata": {
- "collapsed": false
- },
- "outputs": [],
- "source": [
- "from pyspark import SparkContext, SparkConf\n",
- "conf = SparkConf()\n",
- "sc = SparkContext(conf=conf)\n"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "\n",
- "RDD's are immutable. You can't change them once they are created. \n",
- "\n",
- "You can transform them and create new RDD's, but you can't change the original.\n",
- "\n",
- "This property enables Spark to partition the data and move partitions around. \n",
- "\n",
- "Programmers can define how many many partitions there should be to an RDD. \n",
- "\n",
- "If you do not provide a value, one is chosen automatically. \n",
- "\n",
- "You transform RDDs to others RDDs. \n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 3,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "[1, 2, 3, 4, 5]\n",
- "120\n",
- "[2, 4, 6, 8, 10]\n",
- "[2, 4]\n",
- "[1, 1, 1, 2, 4, 8, 3, 9, 27, 4, 16, 64, 5, 25, 125]\n",
- "[6, 5]\n"
- ]
- }
- ],
- "source": [
- "rdd = sc.parallelize([1,2,3,4,5])\n",
- "\n",
- "#reduce(a,b)\n",
- "#take(n)\n",
- "#collect()\n",
- "#takeOrdered(n,func())\n",
- "\n",
- "# Make sure that the resulting list will fit into memory.\n",
- "r0 = rdd.collect()\n",
- "print(r0)\n",
- "\n",
- "# The lambda code gets passed to the workers. \n",
- "#Reduce is a terminal function\n",
- "r1 = rdd.reduce(lambda x, y: x*y)\n",
- "print(r1)\n",
- "\n",
- "\n",
- "r2 = rdd.map(lambda x: x*2).collect()\n",
- "\n",
- "print(r2)\n",
- "\n",
- "r3 = rdd.filter(lambda x: x%2==0).take(2)\n",
- "print(r3)\n",
- "\n",
- "rdd2 = sc.parallelize([2,2,1,1,1,1,2,2,3,3,3])\n",
- "r4 = rdd2.distinct().takeOrdered(2)\n",
- "\n",
- "r5 = rdd.flatMap(lambda x: [x,x*x, x*x*x]).collect()\n",
- "print(r5)\n",
- "\n",
- "rdd3 = sc.parallelize([5,2,3,4,1,6])\n",
- "# takeOrdered() allows you to pass a function to define the sort value.\n",
- "r6 = rdd3.takeOrdered(2, lambda x: -1*x)\n",
- "print(r6)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 4,
- "metadata": {
- "collapsed": true
- },
- "outputs": [],
- "source": [
- "# We will create a local text file for testing.\n",
- "data_file = open(\"data.txt\", \"w\")\n",
- "data_file.write(\"this is a test\\n\")\n",
- "data_file.write(\"to see what happens\\n\")\n",
- "data_file.write(\"with text data\\n\")\n",
- "data_file.close()\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 5,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "3\n",
- "\n",
- "['this is a test', 'to see what happens', 'with text data']\n",
- "\n",
- "[['this', 'is', 'a', 'test'], ['to', 'see', 'what', 'happens'], ['with', 'text', 'data']]\n",
- "\n",
- "['this', 'is', 'a', 'test', 'to', 'see', 'what', 'happens', 'with', 'text', 'data']\n"
- ]
- }
- ],
- "source": [
- "# textFile() will load text data as one string per line of data. \n",
- "data = sc.textFile(\"data.txt\")\n",
- "print(data.count())\n",
- "print(\"\")\n",
- "print(data.collect())\n",
- "print(\"\")\n",
- "\n",
- "# YOu can split the lines by any separator. \n",
- "words = data.map(lambda line: line.split(\" \")).collect()\n",
- "print(words)\n",
- "print(\"\")\n",
- "\n",
- "flat_words = data.flatMap(lambda line: line.split(\" \")) \\\n",
- " .collect()\n",
- " \n",
- "print(flat_words)\n",
- "\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 6,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "The line count was 3 and the wordcount was 11.\n"
- ]
- }
- ],
- "source": [
- "# You need to cache if you want to avoid duplicate loading. \n",
- "data = sc.textFile(\"data.txt\")\n",
- "#data.cache()\n",
- "\n",
- "# Load data from file. \n",
- "linecount = data.count()\n",
- "# Load data from file again if not cached. \n",
- "wordcount = data.flatMap(lambda line: line.split(\" \")) \\\n",
- " .count()\n",
- "\n",
- "print(\"The line count was {} and the wordcount was {}.\" \\\n",
- " .format(linecount, wordcount))\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 7,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "data": {
- "text/plain": [
- "[(1, 10), (1, 30), (3, 20)]"
- ]
- },
- "execution_count": 7,
- "metadata": {},
- "output_type": "execute_result"
- }
- ],
- "source": [
- "\"\"\"\n",
- "reduceByKey(func)\n",
- "sortByKey()\n",
- "groupByKey()\n",
- "\n",
- "\"\"\"\n",
- "rdd = sc.parallelize([(1,10), (3,20),(1,30) ])\n",
- "rdd.sortByKey().collect()"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 8,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "data": {
- "text/plain": [
- "[(1, <pyspark.resultiterable.ResultIterable at 0x7f362aa1af98>),\n",
- " (3, <pyspark.resultiterable.ResultIterable at 0x7f362aa1aef0>)]"
- ]
- },
- "execution_count": 8,
- "metadata": {},
- "output_type": "execute_result"
- }
- ],
- "source": [
- "# groupByKey()\n",
- "rdd = sc.parallelize([(1,10), (3,20),(1,30) ])\n",
- "# This can cause problems if all the values for a key are large. \n",
- "rdd.groupByKey().collect()"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 9,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "data": {
- "text/plain": [
- "[(1, 40), (3, 20)]"
- ]
- },
- "execution_count": 9,
- "metadata": {},
- "output_type": "execute_result"
- }
- ],
- "source": [
- "# reduceByKey()\n",
- "rdd = sc.parallelize([(1,10), (3,20),(1,30) ])\n",
- "rdd.reduceByKey(lambda x,y: x + y).collect()"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 10,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "There were 2 Clubs.\n",
- "There were 3 Hearts.\n"
- ]
- }
- ],
- "source": [
- "# Count how many of each card type were present. \n",
- "rdd = sc.parallelize([(\"Hearts\",2), \n",
- " (\"Clubs\", 8),\n",
- " (\"Hearts\",3),\n",
- " (\"Hearts\",5),\n",
- " (\"Clubs\", 4) ])\n",
- "result = rdd.map(lambda x: (x[0], 1)) \\\n",
- " .reduceByKey(lambda x,y: x+y) \\\n",
- " .collect()\n",
- " \n",
- "for item in result: \n",
- " message = \"There were {} {}.\".format(item[1], item[0])\n",
- " print(message)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 11,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "The Clubs found were [8, 4]\n",
- "The Hearts found were [2, 3, 5]\n"
- ]
- }
- ],
- "source": [
- "rdd = sc.parallelize([(\"Hearts\",2), \n",
- " (\"Clubs\", 8),\n",
- " (\"Hearts\",3),\n",
- " (\"Hearts\",5),\n",
- " (\"Clubs\", 4) ])\n",
- "# Put each element in an list so that you can add the lists together. \n",
- "#[1] + [2,3] = [1,2,3]\n",
- "result = rdd.map(lambda x: (x[0], [x[1]])) \\\n",
- " .reduceByKey(lambda x,y: x + y) \\\n",
- " .collect()\n",
- " \n",
- "for item in result: \n",
- " message = \"The {} found were {}\".format(item[0], item[1])\n",
- " print(message)\n",
- " \n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 12,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "The Clubs found were [8, 4]\n",
- "The Hearts found were [2, 3, 5]\n"
- ]
- }
- ],
- "source": [
- "# Sometime develoers will wrap expression in ( ) rather than use / between line breaks. \n",
- "# Either approach will work depending on which you find more readable. \n",
- "\n",
- "result = (rdd.map(lambda x: (x[0], [x[1]]))\n",
- " .reduceByKey(lambda x,y: x + y) \n",
- " .collect())\n",
- " \n",
- "for item in result: \n",
- " message = \"The {} found were {}\".format(item[0], item[1])\n",
- " print(message)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 13,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "The rdd has 5 partitions.\n",
- "The rdd has 24 partitions.\n"
- ]
- }
- ],
- "source": [
- "# You can define the number of partitions in an RDD\n",
- "\n",
- "# Reminder: range(4) -> [0,1,2,3]\n",
- "numPartitions = 5\n",
- "rdd = sc.parallelize(range(100), numPartitions )\n",
- "print( \"The rdd has {} partitions.\".format(rdd.getNumPartitions()))\n",
- "\n",
- "# If you don't pass the number of partitions, Spark will pick it for you. \n",
- "rdd = sc.parallelize(range(100))\n",
- "print( \"The rdd has {} partitions.\".format(rdd.getNumPartitions()))\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 14,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "4\n",
- "[['1', '2', '3'], ['4', '5', '6'], ['7', '8', '9'], ['4', '11', '12']]\n"
- ]
- }
- ],
- "source": [
- "# Here is a csv data example. \n",
- "data_file = open(\"data2.csv\", \"w\")\n",
- "data_file.write(\"1,2,3\\n\")\n",
- "data_file.write(\"4,5,6\\n\")\n",
- "data_file.write(\"7,8,9\\n\")\n",
- "data_file.write(\"4,11,12\\n\")\n",
- "data_file.close()\n",
- "\n",
- "csvdata = sc.textFile(\"data2.csv\")\n",
- "\n",
- "# Load data from file. \n",
- "linecount = csvdata.count()\n",
- "print(linecount)\n",
- "\n",
- "# Turn the list of strings into a list of lists\n",
- "rows = csvdata.map(lambda line: line.split(\",\"))\n",
- "\n",
- "print(rows.collect())\n",
- "\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 15,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "26\n"
- ]
- }
- ],
- "source": [
- "# Once we have the rdd in a list of rows format, we can do additional operations. \n",
- "\n",
- "# Add up the second values in each row after converting them to integers. \n",
- "result = (rows.map(lambda x: x[1]) \n",
- " .map(lambda x: int(x)) \n",
- " .reduce(lambda x, y: x+y) )\n",
- "\n",
- "print(result)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 16,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "[('1', 5), ('4', 11), ('7', 17), ('4', 23)]\n"
- ]
- }
- ],
- "source": [
- "# Return the sum of the 2nd and 3rd value using the first value as a key\n",
- "result = (rows.map(lambda x: (x[0], int(x[1]) + int(x[2]) ) ) \n",
- " .collect() )\n",
- "print(result)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 17,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "[('1', 5), ('4', 34), ('7', 17)]\n"
- ]
- }
- ],
- "source": [
- "# Repeat but group by key summing the values for rows with the same 1st value. \n",
- "result = (rows.map(lambda x: (x[0], int(x[1]) + int(x[2]) ) ) \n",
- " .reduceByKey(lambda x, y: x+y) \n",
- " .sortByKey()\n",
- " .collect())\n",
- "print(result)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 18,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "['Bob,5,2,3', 'Kevin,8,5,9', 'Bob,5,6,7', 'Stuart,4,5,6', 'Kevin,1,5,5']\n"
- ]
- }
- ],
- "source": [
- "\"\"\"\n",
- "Given this csv data, use Spark to answer the following questions. \n",
- "Write your code so that it would scale and work for ten million rows as well as for thew few rows provided. \n",
- "\n",
- "Gru has been assigning his minions to help deliver packages during the holidays. \n",
- "Gru has a csv file containing millions of records of minion deliveries. \n",
- "\n",
- "The csv data is in this format. \n",
- "minion, quantity, kilometers, minutes\n",
- "\n",
- "\n",
- "The first column contains the unique name of the minion. \n",
- "The second column contains the number of packages delivered on a trip. \n",
- "The third column contains the distance traveled on a delivery trip. \n",
- "The fourth column contains the time in minutes needed to complete the trip. \n",
- "\n",
- "\"\"\"\n",
- "rawdata = \"\"\"Bob,5,2,3\n",
- "Kevin,8,5,9\n",
- "Bob,5,6,7\n",
- "Stuart,4,5,6\n",
- "Kevin,1,5,5\n",
- "\"\"\"\n",
- "data_file = open(\"data3.csv\", \"w\")\n",
- "data_file.write(rawdata)\n",
- "data_file.close()\n",
- "\n",
- "# Load data from file. \n",
- "data = sc.textFile(\"data3.csv\")\n",
- "print(data.collect())\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 74,
- "metadata": {
- "collapsed": false
- },
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Q1 = 5\n",
- "Q2 = [('Bob', 8), ('Kevin', 10), ('Stuart', 5)]\n",
- "Q3 = 23\n",
- "Q4 = \n",
- "Q5 = \n"
- ]
- }
- ],
- "source": [
- "\"\"\"\n",
- "Write the spark code to answer the following questions using the data RDD defined above. \n",
- "\n",
- "Q1 - How many minions are there records for?\n",
- "Q2 - How many kilometers did each minon travel in total? \n",
- "Q3 - How many total kilometers were travelled by all minions overall? \n",
- "Q4 - What was the total kilometers traveled and total minutes traveled for each minion? \n",
- "Q5 - How many items were delivered per minute by each minion? \n",
- "\n",
- "\"\"\"\n",
- "\n",
- "q1 = data.count()\n",
- "\n",
- "\n",
- "\n",
- "data1 = data.map(lambda line: line.split(\",\"))\n",
- "result = (data1.map(lambda x: (x[0], int(x[2]) ) ) \n",
- " .reduceByKey(lambda x, y: x+y) \n",
- " .sortByKey()\n",
- " .collect())\n",
- "\n",
- "q2 = result\n",
- "\n",
- "result2 = (data1.map(lambda x: x[2]) .map(lambda x: int(x)) .reduce(lambda x, y: x+y) )\n",
- "\n",
- "q3 = result2\n",
- "\n",
- "\n",
- "\n",
- "\n",
- "q4 = \"\"\n",
- "q5 = \"\"\n",
- "\n",
- "print(\"Q1 = {}\".format(q1))\n",
- "print(\"Q2 = {}\".format(q2))\n",
- "print(\"Q3 = {}\".format(q3))\n",
- "print(\"Q4 = {}\".format(q4))\n",
- "print(\"Q5 = {}\".format(q5))\n"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": []
- }
- ],
- "metadata": {
- "kernelspec": {
- "display_name": "Python 3",
- "language": "python",
- "name": "python3"
- },
- "language_info": {
- "codemirror_mode": {
- "name": "ipython",
- "version": 3
- },
- "file_extension": ".py",
- "mimetype": "text/x-python",
- "name": "python",
- "nbconvert_exporter": "python",
- "pygments_lexer": "ipython3",
- "version": "3.4.3"
- }
- },
- "nbformat": 4,
- "nbformat_minor": 0
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement