Advertisement
ArcheontPB

BigDataFundamentals

Mar 4th, 2020
865
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.00 KB | None | 0 0
  1. # Print the version of SparkContext
  2. print("The version of Spark Context in the PySpark shell is", sc.version)
  3.  
  4. # Print the Python version of SparkContext
  5. print("The Python version of Spark Context in the PySpark shell is", sc.pythonVer)
  6.  
  7. # Print the master of SparkContext
  8. print("The master of Spark Context in the PySpark shell is", sc.master)
  9.  
  10. # Create a python list of numbers from 1 to 100
  11. numb = range(1, 100)
  12.  
  13. # Load the list into PySpark
  14. spark_data = sc.parallelize(numb)
  15. # Load a local file into PySpark shell
  16. lines = sc.textFile(file_path)
  17.  
  18. squared_list_lambda
  19.  
  20. # Check the number of partitions in fileRDD
  21. print("Number of partitions in fileRDD is", fileRDD.getNumPartitions())
  22.  
  23. # Create a fileRDD_part from file_path with 5 partitions
  24. fileRDD_part = sc.textFile(file_path, minPartitions = 5)
  25.  
  26. # Check the number of partitions in fileRDD_part
  27. print("Number of partitions in fileRDD_part is", fileRDD_part.getNumPartitions())
  28.  
  29. # Create map() transformation to cube numbers
  30. cubedRDD = numbRDD.map(lambda x: x**3)
  31.  
  32. # Collect the results
  33. numbers_all = cubedRDD.collect()
  34.  
  35. # Print the numbers from numbers_all
  36. for numb in numbers_all:
  37. print(numb)
  38. '''
  39. Brilliant! collect() should only be used to retrieve results for small datasets. It shouldn’t be used on large datasets.
  40. '''
  41. # Filter the fileRDD to select lines with Spark keyword
  42. fileRDD_filter = fileRDD.filter(lambda line: 'Spark' in line)
  43.  
  44. # How many lines are there in fileRDD?
  45. print("The total number of lines with the keyword Spark is", fileRDD_filter.count())
  46.  
  47. # Print the first four lines of fileRDD
  48. for line in fileRDD_filter.take(4):
  49. print(line)
  50.  
  51. # Create PairRDD Rdd with key value pairs
  52. Rdd = sc.parallelize([(1,2),(3,4),(3,6),(4,5)])
  53.  
  54. # Apply reduceByKey() operation on Rdd
  55. Rdd_Reduced = Rdd.reduceByKey(lambda x, y: x + y)
  56.  
  57. # Iterate over the result and print the output
  58. for num in Rdd_Reduced.collect():
  59. print("Key {} has {} Counts".format(num[0], num[1]))
  60.  
  61. # Sort the reduced RDD with the key by descending order
  62. Rdd_Reduced_Sort = Rdd_Reduced.sortByKey(ascending=False)
  63.  
  64. # Iterate over the result and print the output
  65. for num in Rdd_Reduced_Sort.collect():
  66. print("Key {} has {} Counts".format(num[0], num[1]))
  67.  
  68. # Transform the rdd with countByKey()
  69. total = Rdd.countByKey()
  70.  
  71. # What is the type of total?
  72. print("The type of total is", type(total))
  73.  
  74. # Iterate over the total and print the output
  75. for k, v in total.items():
  76. print("key", k, "has", v, "counts")
  77.  
  78. # Create a baseRDD from the file path
  79. baseRDD = sc.textFile(file_path)
  80.  
  81. # Split the lines of baseRDD into words
  82. splitRDD = baseRDD.flatMap(lambda x: x.split())
  83.  
  84. # Count the total number of words
  85. print("Total number of words in splitRDD:", splitRDD.count())
  86.  
  87. # Convert the words in lower case and remove stop words from stop_words
  88. splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)
  89.  
  90. # Create a tuple of the word and 1
  91. splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))
  92.  
  93. # Count of the number of occurences of each word
  94. resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)
  95.  
  96. # Display the first 10 words and their frequencies
  97. for word in resultRDD.take(10):
  98. print(word)
  99.  
  100. # Swap the keys and values
  101. resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))
  102.  
  103. # Sort the keys in descending order
  104. resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)
  105.  
  106. # Show the top 10 most frequent words and their frequencies
  107. for word in resultRDD_swap_sort.take(10):
  108. print("{} has {} counts". format(word[1], word[0]))
  109.  
  110.  
  111. # Create a list of tuples
  112. sample_list = [('Mona',20), ('Jennifer',34),('John',20), ('Jim',26)]
  113.  
  114. # Create a RDD from the list
  115. rdd = sc.parallelize(sample_list)
  116.  
  117. # Create a PySpark DataFrame
  118. names_df = spark.createDataFrame(rdd, schema=['Name', 'Age'])
  119.  
  120. # Check the type of names_df
  121. print("The type of names_df is", type(names_df))
  122.  
  123. # Create an DataFrame from file_path
  124. people_df = spark.read.csv(file_path, header=True, inferSchema=True)
  125.  
  126. # Check the type of people_df
  127. print("The type of people_df is", type(people_df))
  128.  
  129. # Create a temporary table "people"
  130. people_df.createOrReplaceTempView("people")
  131.  
  132. # Construct a query to select the names of the people from the temporary table "people"
  133. query = '''SELECT name FROM people'''
  134.  
  135. # Assign the result of Spark's query to people_df_names
  136. people_df_names = spark.sql(query)
  137.  
  138. # Print the top 10 names of the people
  139. people_df_names.show(10)
  140.  
  141. # Filter the people table to select female sex
  142. people_female_df = spark.sql('SELECT * FROM people WHERE sex=="female"')
  143.  
  144. # Filter the people table DataFrame to select male sex
  145. people_male_df = spark.sql('SELECT * FROM people WHERE sex=="male"')
  146.  
  147. # Count the number of rows in both DataFrames
  148. print("There are {} rows in the people_female_df and {} rows in the people_male_df DataFrames".format(people_female_df.count(), people_male_df.count()))
  149.  
  150. # Check the column names of names_df
  151. print("The column names of names_df are", names_df.columns)
  152.  
  153. # Convert to Pandas DataFrame
  154. df_pandas = names_df.toPandas()
  155.  
  156. # Create a horizontal bar plot
  157. df_pandas.plot(kind='barh', x='Name', y='Age', colormap='winter_r')
  158. plt.show()
  159.  
  160. # Load the Dataframe
  161. fifa_df = spark.read.csv(file_path, header=True, inferSchema=True)
  162.  
  163. # Check the schema of columns
  164. fifa_df.printSchema()
  165.  
  166. # Show the first 10 observations
  167. fifa_df.show(10)
  168.  
  169. # Print the total number of rows
  170. print("There are {} rows in the fifa_df DataFrame".format(fifa_df.count()))
  171.  
  172. # Create a temporary view of fifa_df
  173. fifa_df.createOrReplaceTempView('fifa_df_table')
  174.  
  175. # Construct the "query"
  176. query = '''SELECT Age FROM fifa_df_table WHERE Nationality == "Germany"'''
  177.  
  178. # Apply the SQL "query"
  179. fifa_df_germany_age = spark.sql(query)
  180.  
  181. # Generate basic statistics
  182. fifa_df_germany_age.describe().show()
  183.  
  184. # Convert fifa_df to fifa_df_germany_age_pandas DataFrame
  185. fifa_df_germany_age_pandas = fifa_df_germany_age.toPandas()
  186.  
  187. # Plot the 'Age' density of Germany Players
  188. fifa_df_germany_age_pandas.plot(kind='density')
  189. plt.show()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement