Guest User

Untitled

a guest
Jul 20th, 2018
75
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.83 KB | None | 0 0
  1. I used mainly python for the whole exerciese. I also created a sample data to test all the code snippet in pyspark and spark-shell.
  2.  
  3. # Create Sample Data
  4.  
  5. ```python
  6. sc = SparkContext()
  7. sqlContext = SQLContext(sc)
  8. part_df = sqlContext.createDataFrame([
  9. (1, 'bob', '2015-01-13', 14),
  10. (2, 'alice', '2015-04-23', 10),
  11. (3, 'john', '2015-04-23', 12)
  12. ], ['partkey','name','comment', 'size'])
  13.  
  14. partsupp_df = sqlContext.createDataFrame([
  15. (1, 100),
  16. (2, 23),
  17. (1, 120),
  18. (2, 28)
  19. ], ['partkey','supplycost'])
  20. ```
  21.  
  22. # QUESTION #1: Joins in Core Spark
  23.  
  24. ### Join in dataframes
  25. ```python
  26. part_df = spark.read.csv('hdfs://data-sets/tpch/data/part')
  27. partsupp_df = spark.read.csv('hdfs://data-sets/tpch/data/partsupp')
  28.  
  29. df = part_df.join(partsupp_df, "partkey")
  30. df.show()
  31. ```
  32.  
  33. ### Join in RDDs
  34.  
  35. ```python
  36. part_rdd = part_df.rdd.map(lambda row: (row[0], (row[1], row[2], row[3])))
  37. partsupp_rdd = partsupp_df.rdd.map(lambda row: (row[0], row[1]))
  38.  
  39. rdd = part_rdd.join(partsupp_rdd)
  40.  
  41. def printx(my_list):
  42. for row in my_list:
  43. print row
  44.  
  45. printx(rdd.take(20))
  46. ```
  47. ### Join in Datasets
  48. This is in Scala since datasets does not exist in Python.
  49.  
  50. ```scala
  51. import org.apache.spark.sql.types._
  52. import org.apache.spark.sql.Row
  53.  
  54. val part_df = spark.createDataFrame(
  55. spark.sparkContext.parallelize(Seq(
  56. Row(1L, "bob", "2015-01-13", 14),
  57. Row(2L, "alice", "2015-04-23", 10),
  58. Row(3L, "john", "2015-04-23", 12)
  59. )),
  60. StructType(List(
  61. StructField("partkey", LongType),
  62. StructField("name", StringType),
  63. StructField("comment", StringType),
  64. StructField("size", IntegerType)
  65. ))
  66. )
  67.  
  68. val partsupp_df = spark.createDataFrame(
  69. spark.sparkContext.parallelize(Seq(
  70. Row(1L, 100),
  71. Row(2L, 23),
  72. Row(1L, 120),
  73. Row(2L, 28)
  74. )),
  75. StructType(List(
  76. StructField("partkey", LongType),
  77. StructField("supplycost", IntegerType)
  78. ))
  79. )
  80.  
  81. case class Part(partkey: Long, name: String, comment: String, size: Int)
  82. case class PartSupp(partkey: Long, supplycost: Int)
  83. val part_ds = part_df.as[Part]
  84. val partsupp_ds = partsupp_df.as[PartSupp]
  85. part_ds.join(partsupp_ds, Seq("partkey")).show()
  86. ```
  87.  
  88. # QUESTION #2: Joins in Spark SQL
  89.  
  90. ```python
  91. sqlContext.registerDataFrameAsTable(part_df, "part")
  92. sqlContext.registerDataFrameAsTable(partsupp_df, "partsupp")
  93.  
  94. res = sqlContext.sql("""
  95. SELECT part.*, partsupp.supplycost
  96. FROM part
  97. JOIN partsupp ON PART.PARTKEY == PARTSUPP.PARTKEY
  98. """)
  99. res.show()
  100. ```
  101.  
  102. # QUESTION #3: Alternate Data Formats
  103. A more suitable data storage format in the case of frequent scans and reads of the dataset with Spark is "Parquet". Parquet is a columnar storage format which especialy efficient when loading certain columns of the data instead of all the columns which makes it good to use especially with SparkSQL.
  104.  
  105. ```python
  106. import time
  107.  
  108. part_df.write.save("hdfs:///data-sets/tpch/data/part.parquet", format="parquet")
  109.  
  110. start = time.time()
  111. df = spark.sql("SELECT name FROM parquet.`hdfs:///data-sets/tpch/data/part.parquet`")
  112. df.count()
  113. print(time.time() - start)
  114.  
  115. start = time.time()
  116. part_df = sparkSession.read.csv('hdfs:///data-sets/tpch/data/part')
  117. part_df.count()
  118. print(time.time() - start)
  119. ```
  120.  
  121. # QUESTION #4: Spark Executors Allocation
  122.  
  123. We can influence the number of executors are used with dynamic allocation by setting a number of spark configurations:
  124.  
  125. 1. `spark.dynamicAllocation.initialExecutors` The initial number of executors to run.
  126. 1. `spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors` define the lower and upper limits of the executor number.
  127.  
  128. Also spark uses heuristics to determine when to remove and request executors. These heuristsics are controlled by another set of configurations:
  129.  
  130. 1. `spark.dynamicAllocation.schedulerBacklogTimeout` if there have been pending tasks backlogged for more than this duration, new executors will be requested.
  131. 1. `spark.dynamicAllocation.sustainedSchedulerBacklogTimeout` same as the previous one but for subsequent executor requests.
  132.  
  133. A spark application asks for more executors when there have been pending tasks for `spark.dynamicAllocation.schedulerBacklogTimeout` seconds, and then triggered again every `spark.dynamicAllocation.sustainedSchedulerBacklogTimeout` seconds thereafter if the queue of pending tasks persists. The number of executors requested in each round increases exponentially from the previous round. A Spark application removes an executor when it has been idle for more than `spark.dynamicAllocation.executorIdleTimeout` seconds.
  134.  
  135. If the application attempts to access state stored in or written by the executor, it will have to perform a recompute the state. Thus, Spark needs a mechanism to decommission an executor gracefully by preserving its state before removing it. The solution for preserving shuffle files is to use an external shuffle service that runs on each node of your cluster
  136.  
  137. `spark.dynamicAllocation.cachedExecutorIdleTimeout` if an executor which has cached data blocks has been idle for more than this duration, the executor will be removed.
  138.  
  139. **Pros:** using dynamic allocation is useful to ensure maximum resource utilization especially in the case of multiple applications sharing resources in the Spark cluster. In this way, the application is guarenteed to finish in a good time without overusing the resources.
  140.  
  141. **Cons:** An external shuffle service must be set up on each worker node in the same cluster to keep the shuffle files after removing executors. Also, executors containing cached data are removed only after they become idle for `spark.dynamicAllocation.cachedExecutorIdleTimeout` seconds. After that, the application have to recompute the lost data. If this happened regularly, it can affect the execution time.
  142.  
  143. # QUESTION #5: Create Tables Programmatically
  144.  
  145. ```python
  146. sqlContext.sql("""
  147. CREATE EXTERNAL TABLE IF NOT EXISTS my_table (
  148. name STRING,
  149. birth_date DATE
  150. ) PARTITIONED BY (year(birth_date), month(birth_date))
  151. LOCATION 'hdfs:///data-sets/tpch/data/my_table'
  152. """)
  153.  
  154. my_DF.write()
  155. .format("parquet")
  156. .option("compression", "snappy")
  157. .mode("overwrite")
  158. .save("hdfs://data-sets/tpch/data/my_table")
  159. ```
  160. # QUESTION #6: Update Only Affected Partitions
  161.  
  162. ```python
  163. newDF.createOrReplaceTempView("impressions_dataframe")
  164. spark.sql("""
  165. INSERT OVERWRITE TABLE my_table
  166. PARTITION(year(birth_date), month)
  167. SELECT ..., month(birth_date)
  168. FROM impressions_dataframe
  169. GROUP BY ad
  170. """)
  171.  
  172. #OR
  173.  
  174. newDF.filter("month='Jan'").write.mode(SaveMode.Overwrite).save("hdfs://data-sets/tpch/data/my_table/month=Jan&year=2018")
  175. newDF.filter("month='Feb'").write.mode(SaveMode.Overwrite).save("hdfs://data-sets/tpch/data/my_table/month=Feb&year=2018")
  176. ```
  177.  
  178. # QUESTION #7: ETL Pipeline Automated Testing
  179.  
  180.  
  181. # QUESTION #8: Performance Tuning
  182.  
  183. ### Data format:
  184. make sure to use an extensioble object format (I would suggest Parquet for the reasons I mentioned before) instead of plain text formats such as JSON or CSV.
  185.  
  186. ### Serialization:
  187. Switch object serialization to Kyro, which is faster and is more compact compared to the default java serialzier.
  188.  
  189. ### Caching and Persisting:
  190. 1. Persist or cache RDDs that will be used more than once.
  191. 1. If there is a need to cache large amounts of processed data, change Spark's cache from `MEMORY_ONLY` to `MEMORY_AND_DISK`. The first storage level may drop partitions and recalculate them while the second one saves overflow partitions into disk.
  192.  
  193. ### Job Optimization:
  194. 1. dropping columns that are not used in the aggregations and the report from the loading process. This will make the loading faster in case we are using a columnar storage format. Also, the data will consume less memory and the shuffles will be faster.
  195. 1. optimize the job by removing unnecessary transformations and picking more optimized ones. The goal is to minimize the needed shuffling.
  196. 1. if a small table like the `'NATION'` table is part of the join tables, we can broadcast the rdd/dataframe that contain that table to all executors before executing the join. This way no shuffling will be needed for the large tables (`'SUPPLIER'` and `'CUSTOMER'`) which are joined with the small one.
  197.  
  198. ### Resource Allocation:
  199. if the execution is on Yarn, make sure that the requested resources in Spark fit into the resources avaiable to YARN (`yarn.nodemanager.resource.memory-mb`, `yarn.nodemanager.resource.cpu-vcores`)
  200.  
  201. I noticed that the number of executors is very big with tiny resources for each. Also there aren't enough cores for a 1,000 executors (12\*16=192). If there isn't enough resources for the provided number of execures only the ones that fit will be created.
  202. Extra resources in the cluster should be left for Hadoop, Yarn, OS, etc. For memory, Spark job will consume: 1000\*1 + 30 =1030, available: 112\*12=1344 so we're good on the memory but not on the cores.
  203.  
  204. My suggestion is to use 160 cores 2 for each executor which gives 80 executors. Each executor will have 15G of memory.
Add Comment
Please, Sign In to add comment