Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Master-
- Workers-
- Spark's core data structure is the Resilient Distributed Dataset (RDD). This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster. However, RDDs are hard to work with directly, so in this course you'll be using the Spark DataFrame abstraction built on top of RDDs.
- You can think of the SparkContext as your connection to the cluster and the SparkSession as your interface with that connection.
- We've already created a SparkSession for you called spark, but what if you're not sure there already is one? Creating multiple SparkSessions and SparkContexts can cause issues, so it's best practice to use the SparkSession.builder.getOrCreate() method. This returns an existing SparkSession if there's already one in the environment, or creates a new one if necessary!
- # Filter flights by passing a string
- long_flights1 = flights.filter("distance > 1000")
- # Filter flights by passing a column of boolean values
- long_flights2 = flights.filter(flights.distance > 1000)
- # Print the data to check they're equal
- long_flights1.show()
- long_flights2.show()
- The difference between .select() and .withColumn() methods is that .select() returns only the columns you specify, while .withColumn() returns all the columns of the DataFrame in addition to the one you defined.
- # Select the first set of columns
- selected1 = flights.select("tailnum", "origin", "dest")
- # Select the second set of columns
- temp = flights.select(flights.origin, flights.dest, flights.carrier)
- # Define first filter
- filterA = flights.origin == "SEA"
- # Define second filter
- filterB = flights.dest == "PDX"
- # Filter the data, first by filterA then by filterB
- selected2 = temp.filter(filterA).filter(filterB)
- # Define avg_speed
- avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")
- # Select the correct columns
- speed1 = flights.select("origin", "dest", "tailnum", avg_speed)
- # Create the same table using a SQL expression
- speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")
- # Find the shortest flight from PDX in terms of distance
- flights.filter(flights.origin == 'PDX').groupBy().min('distance').show()
- # Find the longest flight from SEA in terms of air time
- flights.filter(flights.origin == 'SEA').groupBy().max('air_time').show()
- # Group by tailnum
- by_plane = flights.groupBy("tailnum")
- # Number of flights each plane made
- by_plane.count().show()
- # Group by origin
- by_origin = flights.groupBy("origin")
- # Average duration of flights from PDX and SEA
- by_origin.avg("air_time").show()
- # Import pyspark.sql.functions as F
- import pyspark.sql.functions as F
- # Group by month and dest
- by_month_dest = flights.groupBy("month", "dest")
- # Average departure delay by month and destination
- by_month_dest.avg("dep_delay").show()
- # Standard deviation of departure delay
- by_month_dest.agg(F.stddev("dep_delay")).show()
- # Examine the data
- print(airports.show())
- # Rename the faa column
- airports = airports.withColumnRenamed("faa", "dest")
- # Join the DataFrames
- flights_with_airports = flights.join(airports, on="dest", how="leftouter")
- # Examine the new DataFrame
- print(flights_with_airports.show())
- # Rename year column
- planes = planes.withColumnRenamed("year", "plane_year")
- # Join the DataFrames
- model_data = flights.join(planes, on="tailnum", how="leftouter")
- Good work! Before you get started modeling, it's important to know that Spark only handles numeric data. That means all of the columns in your DataFrame must be either integers or decimals (called 'doubles' in Spark).
- When we imported our data, we let Spark guess what kind of information each column held. Unfortunately, Spark doesn't always guess right and you can see that some of the columns in our DataFrame are strings containing numbers as opposed to actual numeric values.
- To remedy this, you can use the .cast() method in combination with the .withColumn() method. It's important to note that .cast() works on columns, while .withColumn() works on DataFrames.
- # Cast the columns to integers
- model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
- model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
- model_data = model_data.withColumn("month", model_data.month.cast("integer"))
- model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))
- # Create the column plane_age
- model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)
- # Create is_late
- model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)
- # Convert to an integer
- model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))
- # Remove missing values
- model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")
- # Create a StringIndexer
- carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")
- # Create a OneHotEncoder
- carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")
- # Create a StringIndexer
- dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")
- # Create a OneHotEncoder
- dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")
- # Make a VectorAssembler
- vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")
- # Import Pipeline
- from pyspark.ml import Pipeline
- # Make the pipeline
- flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])
- # Fit and transform the data
- piped_data = flights_pipe.fit(model_data).transform(model_data)
- # Split the data into training and test sets
- training, test = piped_data.randomSplit([.6, .4])
- You'll tune this model by testing different values for several hyperparameters. A hyperparameter is just a value in the model that's not estimated from the data, but rather is supplied by the user to maximize performance. For this course it's not necessary to understand the mathematics behind all of these values - what's important is that you'll try out a few different choices and pick the best one.
- # Import LogisticRegression
- from pyspark.ml.classification import LogisticRegression
- # Create a LogisticRegression Estimator
- lr = LogisticRegression()
- # Import the evaluation submodule
- import pyspark.ml.evaluation as evals
- # Create a BinaryClassificationEvaluator
- evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")
- # Import the tuning submodule
- import pyspark.ml.tuning as tune
- # Create the parameter grid
- grid = tune.ParamGridBuilder()
- # Add the hyperparameter
- grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
- grid = grid.addGrid(lr.elasticNetParam, [0, 1])
- # Build the grid
- grid = grid.build()
- # Create the CrossValidator
- cv = tune.CrossValidator(estimator=lr,
- estimatorParamMaps=grid,
- evaluator=evaluator
- )
- # Call lr.fit()
- best_lr = lr.fit(training)
- # Print best_lr
- print(best_lr)
- # Use the model to predict the test set
- test_results = best_lr.transform(test)
- # Evaluate the predictions
- print(evaluator.evaluate(test_results))
- # Import the pyspark.sql.types library
- from pyspark.sql.types import *
- # Define a new schema using the StructType method
- people_schema = StructType([
- # Define a StructField for each field
- StructField('name', StringType(), False),
- StructField('age', IntegerType(), False),
- StructField('city', StringType(), False)
- ])
- # Load the CSV file
- aa_dfw_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2018.csv.gz')
- # Add the airport column using the F.lower() method
- aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))
- # Drop the Destination Airport column
- aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])
- # Show the DataFrame
- print(aa_dfw_df.show())
- # View the row count of df1 and df2
- print("df1 Count: %d" % df1.count())
- print("df2 Count: %d" % df2.count())
- # Combine the DataFrames into one
- df3 = df1.union(df2)
- # Save the df3 DataFrame in Parquet format
- df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')
- # Read the Parquet file into a new DataFrame and run a count
- print(spark.read.parquet('AA_DFW_ALL.parquet').count())
- # Read the Parquet file into flights_df
- flights_df = spark.read.parquet('AA_DFW_ALL.parquet')
- # Register the temp table
- flights_df.createOrReplaceTempView('flights')
- # Run a SQL query of the average flight duration
- avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
- print('The average flight time is: %d' % avg_duration)
- # Show the distinct VOTER_NAME entries
- voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)
- # Filter voter_df where the VOTER_NAME is 1-20 characters in length
- voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')
- # Filter out voter_df where the VOTER_NAME contains an underscore
- voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))
- # Show the distinct VOTER_NAME entries again
- voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)
- # Add a new column called splits separated on whitespace
- voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))
- # Create a new column called first_name based on the first item in splits
- voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))
- # Get the last entry of the splits list and create a column called last_name
- voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))
- # Drop the splits column
- voter_df = voter_df.drop('splits')
- # Show the voter_df DataFrame
- voter_df.show()
- # Add a column to voter_df for any voter with the title **Councilmember**
- voter_df = voter_df.withColumn('random_val',
- F.when(voter_df.TITLE == 'Councilmember', F.rand()))
- # Show some of the DataFrame rows, noting whether the when clause worked
- voter_df.show()
- # Add a column to voter_df for a voter based on their position
- voter_df = voter_df.withColumn('random_val',
- when(voter_df.TITLE == 'Councilmember', F.rand())
- .when(voter_df.TITLE == 'Mayor', 2)
- .otherwise(0))
- # Show some of the DataFrame rows
- voter_df.show()
- # Use the .filter() clause with random_val
- voter_df.filter(voter_df.random_val == 0).show()
- def getFirstAndMiddle(names):
- # Return a space separated string of names
- return ' '.join(names[:-1])
- # Define the method as a UDF
- udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())
- # Create a new column using your UDF
- voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))
- # Drop the unnecessary columns then show the DataFrame
- voter_df = voter_df.drop('first_name')
- voter_df = voter_df.drop('splits')
- voter_df.show()
- # Select all the unique council voters
- voter_df = df.select(df["VOTER NAME"]).distinct()
- # Count the rows in voter_df
- print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())
- # Add a ROW_ID
- voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
- # Show the rows with 10 highest IDs in the set
- voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
- # Print the number of partitions in each DataFrame
- print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())
- print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())
- # Add a ROW_ID field to each DataFrame
- voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
- voter_df_single = voter_df_single.withColumn('ROW_ID', F.monotonically_increasing_id())
- # Show the top 10 IDs in each DataFrame
- voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
- voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)
- # Determine the highest ROW_ID and save it in previous_max_ID
- previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]
- # Add a ROW_ID column to voter_df_april starting at the desired value
- voter_df_april = voter_df_april.withColumn('ROW_ID', F.monotonically_increasing_id() + previous_max_ID)
- # Show the ROW_ID from both DataFrames and compare
- voter_df_march.select('ROW_ID').show()
- voter_df_april.select('ROW_ID').show()
- start_time = time.time()
- # Add caching to the unique rows in departures_df
- departures_df = departures_df.distinct().cache()
- # Count the unique rows in departures_df, noting how long the operation takes
- print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))
- # Count the rows again, noting the variance in time of a cached DataFrame
- start_time = time.time()
- print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))
- # Determine if departures_df is in the cache
- print("Is departures_df cached?: %s" % departures_df.is_cached)
- print("Removing departures_df from cache")
- # Remove departures_df from the cache
- departures_df.unpersist()
- # Check the cache status again
- print("Is departures_df cached?: %s" % departures_df.is_cached)
- # Import the full and split files into DataFrames
- full_df = spark.read.csv('departures_full.txt.gz')
- split_df = spark.read.csv('departures_*.txt.gz')
- # Print the count and run time for each DataFrame
- start_time_a = time.time()
- print("Total rows in full DataFrame:\t%d" % full_df.count())
- print("Time to run: %f" % (time.time() - start_time_a))
- start_time_b = time.time()
- print("Total rows in split DataFrame:\t%d" % split_df.count())
- print("Time to run: %f" % (time.time() - start_time_b))
- # Name of the Spark application instance
- app_name = spark.conf.get('spark.app.name')
- # Driver TCP port
- driver_tcp_port = spark.conf.get('spark.driver.port')
- # Number of join partitions
- num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
- # Show the results
- print("Name: %s" % app_name)
- print("Driver TCP port: %s" % driver_tcp_port)
- print("Number of partitions: %s" % num_partitions)
- # Store the number of partitions in variable
- before = departures_df.rdd.getNumPartitions()
- # Configure Spark to use 500 partitions
- spark.conf.set('spark.sql.shuffle.partitions', 500)
- # Recreate the DataFrame using the departures data file
- departures_df = spark.read.csv('departures.txt.gz').distinct()
- # Print the number of partitions for each instance
- print("Partition count before change: %d" % before)
- print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())
- # Join the flights_df and aiports_df DataFrames
- normal_df = flights_df.join(airports_df, \
- flights_df["Destination Airport"] == airports_df["IATA"] )
- # Show the query plan
- normal_df.explain()
- # Import the broadcast method from pyspark.sql.functions
- from pyspark.sql.functions import broadcast
- # Join the flights_df and airports_df DataFrames using broadcasting
- broadcast_df = flights_df.join(broadcast(airports_df), \
- flights_df["Destination Airport"] == airports_df["IATA"] )
- # Show the query plan and compare against the original
- broadcast_df.explain()
- start_time = time.time()
- # Count the number of rows in the normal DataFrame
- normal_count = normal_df.count()
- normal_duration = time.time() - start_time
- start_time = time.time()
- # Count the number of rows in the broadcast DataFrame
- broadcast_count = broadcast_df.count()
- broadcast_duration = time.time() - start_time
- # Print the counts and the duration of the tests
- print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
- print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))
- #############################################################
- # Import the data to a DataFrame
- departures_df = spark.read.csv('2015-departures.csv.gz', header=True)
- # Remove any duration of 0
- departures_df = departures_df.filter(departures_df[3] != '0')
- # Add an ID column
- departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())
- # Write the file out to JSON format
- departures_df.write.json('output.json')
- ###########################################################
- # Import the file to a DataFrame and perform a row count
- annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
- full_count = annotations_df.count()
- # Count the number of rows beginning with '#'
- comment_count = annotations_df.filter(col('_c0').startswith('#')).count()
- # Import the file to a new DataFrame, without commented rows
- no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')
- # Count the new DataFrame and verify the difference is as expected
- no_comments_count = no_comments_df.count()
- print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))
- # Split _c0 on the tab character and store the list in a variable
- tmp_fields = F.split(annotations_df['_c0'], '\t')
- # Create the colcount column on the DataFrame
- annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))
- # Remove any rows containing fewer than 5 fields
- annotations_df_filtered = annotations_df.filter(~ (annotations_df['colcount'] < 5))
- # Count the number of rows
- final_count = annotations_df_filtered.count()
- print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))
- # Split the content of _c0 on the tab character (aka, '\t')
- split_cols = F.split(annotations_df['_c0'], '\t')
- # Add the columns folder, filename, width, and height
- split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
- split_df = split_df.withColumn('filename', split_cols.getItem(1))
- split_df = split_df.withColumn('width', split_cols.getItem(2))
- split_df = split_df.withColumn('height', split_cols.getItem(3))
- # Add split_cols as a column
- split_df = split_df.withColumn('split_cols', split_cols)
- def retriever(cols, colcount):
- # Return a list of dog data
- return cols[4:colcount]
- # Define the method as a UDF
- udfRetriever = F.udf(retriever, ArrayType(StringType()))
- # Create a new column using your UDF
- split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))
- # Remove the original column, split_cols, and the colcount
- split_df = split_df.drop('dog_list').drop('split_cols').drop('colcount')
- # Rename the column in valid_folders_df
- valid_folders_df = valid_folders_df.withColumnRenamed('_c0', 'folder')
- # Count the number of rows in split_df
- split_count = split_df.count()
- # Join the DataFrames
- joined_df = split_df.join(F.broadcast(valid_folders_df), "folder")
- # Compare the number of rows remaining
- joined_count = joined_df.count()
- print("Before: %d\nAfter: %d" % (split_count, joined_count))
- # Determine the row counts for each DataFrame
- split_count = split_df.count()
- joined_count = joined_df.count()
- # Create a DataFrame containing the invalid rows
- invalid_df = split_df.join(F.broadcast(joined_df), 'folder', 'left_anti')
- # Validate the count of the new DataFrame is as expected
- invalid_count = invalid_df.count()
- print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))
- # Determine the number of distinct folder rows removed
- invalid_folder_count = invalid_df.select('folder').distinct().count()
- print("%d distinct invalid folders found" % invalid_folder_count)
- # Create a function to return the number and type of dogs as a tuple
- def dogParse(doglist):
- dogs = []
- for dog in doglist:
- (breed, start_x, start_y, end_x, end_y) = dog.split(',')
- dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
- return dogs
- # Create a UDF
- udfDogParse = F.udf(dogParse, ArrayType(DogType))
- # Use the UDF to list of dogs and drop the old column
- joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')
- # Show the number of dogs in the first 10 rows
- joined_df.select(F.size('dogs')).show(10)
- # Define a UDF to determine the number of pixels per image
- def dogPixelCount(doglist):
- totalpixels = 0
- for dog in doglist:
- totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
- return totalpixels
- # Define a UDF for the pixel count
- udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
- joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount(joined_df.dogs))
- # Create a column representing the percentage of pixels
- joined_df = joined_df.withColumn('dog_percent', (joined_df.dog_pixels / (joined_df.width.cast("integer") * joined_df.height.cast("integer"))) * 100)
- # Show the first 10 annotations with more than 60% dog
- joined_df.filter(joined_df.dog_percent > 60).show(10)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement