Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Print the version of SparkContext
- print("The version of Spark Context in the PySpark shell is", sc.version)
- # Print the Python version of SparkContext
- print("The Python version of Spark Context in the PySpark shell is", sc.pythonVer)
- # Print the master of SparkContext
- print("The master of Spark Context in the PySpark shell is", sc.master)
- # Create a python list of numbers from 1 to 100
- numb = range(1, 100)
- # Load the list into PySpark
- spark_data = sc.parallelize(numb)
- # Load a local file into PySpark shell
- lines = sc.textFile(file_path)
- squared_list_lambda
- # Check the number of partitions in fileRDD
- print("Number of partitions in fileRDD is", fileRDD.getNumPartitions())
- # Create a fileRDD_part from file_path with 5 partitions
- fileRDD_part = sc.textFile(file_path, minPartitions = 5)
- # Check the number of partitions in fileRDD_part
- print("Number of partitions in fileRDD_part is", fileRDD_part.getNumPartitions())
- # Create map() transformation to cube numbers
- cubedRDD = numbRDD.map(lambda x: x**3)
- # Collect the results
- numbers_all = cubedRDD.collect()
- # Print the numbers from numbers_all
- for numb in numbers_all:
- print(numb)
- '''
- Brilliant! collect() should only be used to retrieve results for small datasets. It shouldn’t be used on large datasets.
- '''
- # Filter the fileRDD to select lines with Spark keyword
- fileRDD_filter = fileRDD.filter(lambda line: 'Spark' in line)
- # How many lines are there in fileRDD?
- print("The total number of lines with the keyword Spark is", fileRDD_filter.count())
- # Print the first four lines of fileRDD
- for line in fileRDD_filter.take(4):
- print(line)
- # Create PairRDD Rdd with key value pairs
- Rdd = sc.parallelize([(1,2),(3,4),(3,6),(4,5)])
- # Apply reduceByKey() operation on Rdd
- Rdd_Reduced = Rdd.reduceByKey(lambda x, y: x + y)
- # Iterate over the result and print the output
- for num in Rdd_Reduced.collect():
- print("Key {} has {} Counts".format(num[0], num[1]))
- # Sort the reduced RDD with the key by descending order
- Rdd_Reduced_Sort = Rdd_Reduced.sortByKey(ascending=False)
- # Iterate over the result and print the output
- for num in Rdd_Reduced_Sort.collect():
- print("Key {} has {} Counts".format(num[0], num[1]))
- # Transform the rdd with countByKey()
- total = Rdd.countByKey()
- # What is the type of total?
- print("The type of total is", type(total))
- # Iterate over the total and print the output
- for k, v in total.items():
- print("key", k, "has", v, "counts")
- # Create a baseRDD from the file path
- baseRDD = sc.textFile(file_path)
- # Split the lines of baseRDD into words
- splitRDD = baseRDD.flatMap(lambda x: x.split())
- # Count the total number of words
- print("Total number of words in splitRDD:", splitRDD.count())
- # Convert the words in lower case and remove stop words from stop_words
- splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)
- # Create a tuple of the word and 1
- splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))
- # Count of the number of occurences of each word
- resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)
- # Display the first 10 words and their frequencies
- for word in resultRDD.take(10):
- print(word)
- # Swap the keys and values
- resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))
- # Sort the keys in descending order
- resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)
- # Show the top 10 most frequent words and their frequencies
- for word in resultRDD_swap_sort.take(10):
- print("{} has {} counts". format(word[1], word[0]))
- # Create a list of tuples
- sample_list = [('Mona',20), ('Jennifer',34),('John',20), ('Jim',26)]
- # Create a RDD from the list
- rdd = sc.parallelize(sample_list)
- # Create a PySpark DataFrame
- names_df = spark.createDataFrame(rdd, schema=['Name', 'Age'])
- # Check the type of names_df
- print("The type of names_df is", type(names_df))
- # Create an DataFrame from file_path
- people_df = spark.read.csv(file_path, header=True, inferSchema=True)
- # Check the type of people_df
- print("The type of people_df is", type(people_df))
- # Create a temporary table "people"
- people_df.createOrReplaceTempView("people")
- # Construct a query to select the names of the people from the temporary table "people"
- query = '''SELECT name FROM people'''
- # Assign the result of Spark's query to people_df_names
- people_df_names = spark.sql(query)
- # Print the top 10 names of the people
- people_df_names.show(10)
- # Filter the people table to select female sex
- people_female_df = spark.sql('SELECT * FROM people WHERE sex=="female"')
- # Filter the people table DataFrame to select male sex
- people_male_df = spark.sql('SELECT * FROM people WHERE sex=="male"')
- # Count the number of rows in both DataFrames
- print("There are {} rows in the people_female_df and {} rows in the people_male_df DataFrames".format(people_female_df.count(), people_male_df.count()))
- # Check the column names of names_df
- print("The column names of names_df are", names_df.columns)
- # Convert to Pandas DataFrame
- df_pandas = names_df.toPandas()
- # Create a horizontal bar plot
- df_pandas.plot(kind='barh', x='Name', y='Age', colormap='winter_r')
- plt.show()
- # Load the Dataframe
- fifa_df = spark.read.csv(file_path, header=True, inferSchema=True)
- # Check the schema of columns
- fifa_df.printSchema()
- # Show the first 10 observations
- fifa_df.show(10)
- # Print the total number of rows
- print("There are {} rows in the fifa_df DataFrame".format(fifa_df.count()))
- # Create a temporary view of fifa_df
- fifa_df.createOrReplaceTempView('fifa_df_table')
- # Construct the "query"
- query = '''SELECT Age FROM fifa_df_table WHERE Nationality == "Germany"'''
- # Apply the SQL "query"
- fifa_df_germany_age = spark.sql(query)
- # Generate basic statistics
- fifa_df_germany_age.describe().show()
- # Convert fifa_df to fifa_df_germany_age_pandas DataFrame
- fifa_df_germany_age_pandas = fifa_df_germany_age.toPandas()
- # Plot the 'Age' density of Germany Players
- fifa_df_germany_age_pandas.plot(kind='density')
- plt.show()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement