Guest User

Untitled

a guest
Feb 16th, 2019
173
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 29.77 KB | None | 0 0
  1. # 1 : Instructions
  2.  
  3. # Connect to the MySQL database on the itversity labs using sqoop and import all of the data from the orders table into HDFS
  4.  
  5. # Data Description
  6.  
  7. # A MySQL instance is running on a remote node ms.itversity.com in the instance. You will find a table that contains 68883 rows of orders data
  8.  
  9. # MySQL database information:
  10.  
  11. # Installation on the node ms.itversity.com
  12. # Database name is retail_db
  13. # Username: retail_user
  14. # Password: itversity
  15. # Table name orders
  16. # Output Requirements
  17.  
  18. # Place the customer files in the HDFS directory
  19. # /user/`whoami`/problem1/solution/
  20. # Replace `whoami` with your OS user name
  21. # Use a text format with comma as the columnar delimiter
  22. # Load every order record completely
  23. # End of Problem
  24.  
  25. sqoop import \
  26. --connect jdbc:mysql://ms.itversity.com/retail_db \
  27. --username retail_user \
  28. --password itversity \
  29. --table orders \
  30. --target-dir /user/codersham/problem1/solution \
  31. --as-textfile \
  32. --fields-terminated-by ','
  33.  
  34. #2 :Instructions
  35.  
  36. # Get the customers who have not placed any orders, sorted by customer_lname and then customer_fname
  37.  
  38. # Data Description
  39.  
  40. # Data is available in local file system /data/retail_db
  41.  
  42. # retail_db information:
  43.  
  44. # Source directories: /data/retail_db/orders and /data/retail_db/customers
  45. # Source delimiter: comma(",")
  46. # Source Columns - orders - order_id, order_date, order_customer_id, order_status
  47. # Source Columns - customers - customer_id, customer_fname, customer_lname and many more
  48. # Output Requirements
  49.  
  50. # Target Columns: customer_lname, customer_fname
  51. # Number of Files: 1
  52. # Place the output file in the HDFS directory
  53. # /user/`whoami`/problem2/solution/
  54. # Replace `whoami` with your OS user name
  55. # File format should be text
  56. # delimiter is (",")
  57. # Compression: Uncompressed
  58. # End of Problem
  59.  
  60. sc.setLogLevel('ERROR')
  61.  
  62. from pyspark.sql import Row
  63.  
  64. orders_rdd = sc.textFile('/public/retail_db/orders'). \
  65. map(lambda rec: Row(
  66. order_id = int(rec.split(',')[0]),
  67. order_customer_id =int(rec.split(',')[2])
  68. ))
  69.  
  70. orders_DF = sqlContext.createDataFrame(orders_rdd)
  71.  
  72. orders_DF.registerTempTable('orders')
  73.  
  74. cusotmers_rdd = sc.textFile('/public/retail_db/customers'). \
  75. map(lambda rec: Row(
  76. customer_id = int(rec.split(',')[0]),
  77. customer_fname = rec.split(',')[1],
  78. customer_lname = rec.split(',')[2]))
  79.  
  80. customers_DF = sqlContext.createDataFrame(cusotmers_rdd)
  81.  
  82. customers_DF.registerTempTable('customers')
  83.  
  84. result = sqlContext.sql("select distinct customer_lname, customer_fname from (select c.customer_lname, c.customer_fname, o.order_id from customers c left outer join orders o on c.customer_id = o.order_customer_id)q where q.order_id is NULL")
  85.  
  86. sqlContext.setConf('spark.sql.shuffle.partitions','1')
  87.  
  88. result. \
  89. selectExpr("concat(customer_lname,',',customer_fname)"). \
  90. write. \
  91. mode('overwrite'). \
  92. text('/user/codersham/problem2/solution')
  93.  
  94. # 3: Instructions
  95.  
  96. # Get top 3 crime types based on number of incidents in RESIDENCE area using "Location Description"
  97.  
  98. # Data Description
  99.  
  100. # Data is available in HDFS under /public/crime/csv
  101.  
  102. # crime data information:
  103.  
  104. # Structure of data: (ID, Case Number, Date, Block, IUCR, Primary Type, Description, Location Description, Arrst, Domestic, Beat, District, Ward, Community Area, FBI Code, X Coordinate, Y Coordinate, Year, Updated on, Latitude, Longitude, Location)
  105. # File format - text file
  106. # Delimiter - "," (use regex while splitting split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1), as there are some fields with comma and enclosed using double quotes.
  107. # Output Requirements
  108.  
  109. # Output Fields: crime_type, incident_count
  110. # Output File Format: JSON
  111. # Delimiter: N/A
  112. # Compression: No
  113. # Place the output file in the HDFS directory
  114. # /user/`whoami`/problem3/solution/
  115. # Replace `whoami` with your OS user name
  116. # End of Problem
  117.  
  118. sc.setLogLevel('ERROR')
  119.  
  120. crime_data_with_header = sc.textFile('/public/crime/csv')
  121.  
  122. crime_data_only_header = sc.parallelize([crime_data_with_header.first()])
  123.  
  124. from pyspark.sql import Row
  125.  
  126. crime_data_rdd = crime_data_with_header.subtract(crime_data_only_header). \
  127. filter(lambda rec: rec.split(',')[7] == 'RESIDENCE'). \
  128. map(lambda rec: Row(
  129. id = int(rec.split(',')[0]),
  130. crime_type = rec.split(',')[5]
  131. ))
  132.  
  133. crime_data_DF = sqlContext.createDataFrame(crime_data_rdd)
  134.  
  135. crime_data_DF.registerTempTable('crime_data')
  136.  
  137. result = sqlContext.sql("select crime_type, incident_count from (select crime_type, incident_count, dense_rank() over(order by incident_count desc) rnk from(select crime_type, count(id) incident_count from crime_data group by crime_type)q)m where m.rnk < 4")
  138.  
  139. result.write.json('/user/codersham/problem3/solution/')
  140.  
  141. #5 :Instructions
  142.  
  143. # Get word count for the input data using space as delimiter (for each word, we need to get how many times it is repeated in the entire input data set)
  144.  
  145. # Data Description
  146.  
  147. # Data is available in HDFS /public/randomtextwriter
  148.  
  149. # word count data information:
  150.  
  151. # Number of executors should be 10
  152. # executor memory should be 3 GB
  153. # Executor cores should be 20 in total (2 per executor)
  154. # Number of output files should be 8
  155. # Avro dependency details: groupId -> com.databricks, artifactId -> spark-avro_2.10, version -> 2.0.1
  156. # Output Requirements
  157.  
  158. # Output File format: Avro
  159. # Output fields: word, count
  160. # Compression: Uncompressed
  161. # Place the customer files in the HDFS directory
  162. # /user/`whoami`/problem5/solution/
  163. # Replace `whoami` with your OS user name
  164. # End of Problem
  165.  
  166. pyspark \
  167. --master yarn \
  168. --conf spark.ui.port=12340 \
  169. --num-executors 10 \
  170. --executor-memory 3g \
  171. --executor-cores 2 \
  172. --packages com.databricks:spark-avro_2.10:2.0.1
  173.  
  174. sc.setLogLevel('ERROR')
  175.  
  176. word_rdd = sc.textFile('/public/randomtextwriter'). \
  177. flatMap(lambda rec: rec.split(' ')). \
  178. map(lambda rec:(rec,1))
  179.  
  180. word_count = word_rdd.reduceByKey(lambda x,y: x+y)
  181.  
  182. from pyspark.sql import Row
  183.  
  184. word = word_count. \
  185. map(lambda rec: Row(
  186. word = rec[0],
  187. count = int(rec[1])
  188. ))
  189.  
  190. word_DF = sqlContext.createDataFrame(word)
  191.  
  192. sqlContext.setConf('spark.sql.shuffle.partitions','8') # Not working to be re-worked
  193.  
  194. word_DF. \
  195. write. \
  196. format('com.databricks.spark.avro'). \
  197. save('/user/codersham/problem5/solution/')
  198.  
  199. #6 : Instructions
  200.  
  201. # Get total number of orders for each customer where the cutomer_state = 'TX'
  202.  
  203. # Data Description
  204.  
  205. # retail_db data is available in HDFS at /public/retail_db
  206.  
  207. # retail_db data information:
  208.  
  209. # Source directories: /public/retail_db/orders and /public/retail_db/customers
  210. # Source Columns - orders - order_id, order_date, order_customer_id, order_status
  211. # Source Columns - customers - customer_id, customer_fname, customer_lname, customer_state (8th column) and many more
  212. # delimiter: (",")
  213. # Output Requirements
  214.  
  215. # Output Fields: customer_fname, customer_lname, order_count
  216. # File Format: text
  217. # Delimiter: Tab character (\t)
  218. # Place the result file in the HDFS directory
  219. # /user/`whoami`/problem6/solution/
  220. # Replace `whoami` with your OS user name
  221. # End of Problem
  222.  
  223. sc.setLogLevel('ERROR')
  224.  
  225. from pyspark.sql import Row
  226.  
  227. orders_rdd = sc.textFile('/public/retail_db/orders'). \
  228. map(lambda rec: Row(
  229. order_id = int(rec.split(',')[0]),
  230. order_customer_id = int(rec.split(',')[2])
  231. ))
  232.  
  233. orders_DF = sqlContext.createDataFrame(orders_rdd)
  234.  
  235. orders_DF.registerTempTable('orders')
  236.  
  237. customers_rdd = sc.textFile('/public/retail_db/customers'). \
  238. map(lambda rec: Row(
  239. customer_id = int(rec.split(',')[0]),
  240. customer_fname = rec.split(',')[1],
  241. customer_lname = rec.split(',')[2],
  242. customer_state = rec.split(',')[7]
  243. ))
  244.  
  245. customers_DF = sqlContext.createDataFrame(customers_rdd)
  246.  
  247. customers_DF.registerTempTable('customers')
  248.  
  249. result = sqlContext.sql("select c.customer_fname, c.customer_lname, count(o.order_id) order_count from customers c join orders o on c.customer_id = o.order_customer_id where c.customer_state = 'TX' group by c.customer_fname, c.customer_lname")
  250.  
  251. sqlContext.setConf('spark.sql.shuffle.partitions','1')
  252.  
  253. result.selectExpr("concat(customer_fname,'\t',customer_lname,'\t',order_count)"). \
  254. write. \
  255. mode('overwrite'). \
  256. text('/user/codersham/problem6/solution/')
  257.  
  258. #7 : Instructions
  259.  
  260. # List the names of the Top 5 products by revenue ordered on '2013-07-26'. Revenue is considered only for COMPLETE and CLOSED orders.
  261.  
  262. # Data Description
  263.  
  264. # retail_db data is available in HDFS at /public/retail_db
  265.  
  266. # retail_db data information:
  267.  
  268. # Source directories:
  269. # /public/retail_db/orders
  270. # /public/retail_db/order_items
  271. # /public/retail_db/products
  272. # Source delimiter: comma(",")
  273. # Source Columns - orders - order_id, order_date, order_customer_id, order_status
  274. # Source Columns - order_items - order_item_id, order_item_order_id, order_item_product_id, order_item_quantity, order_item_subtotal, order_item_product_price
  275. # Source Columns - products - product_id, product_category_id, product_name, product_description, product_price, product_image
  276. # Output Requirements
  277.  
  278. # Target Columns: order_date, order_revenue, product_name, product_category_id
  279. # Data has to be sorted in descending order by order_revenue
  280. # File Format: text
  281. # Delimiter: colon (:)
  282. # Place the output file in the HDFS directory
  283. # /user/`whoami`/problem7/solution/
  284. # Replace `whoami` with your OS user name
  285. # End of Problem
  286.  
  287. sc.setLogLevel('ERROR')
  288.  
  289. from pyspark.sql import Row
  290.  
  291. orders_rdd = sc.textFile('/public/retail_db/orders'). \
  292. map(lambda rec: Row(
  293. order_id = int(rec.split(',')[0]),
  294. order_date = rec.split(',')[1],
  295. order_status = rec.split(',')[3]))
  296.  
  297. orders_DF = sqlContext.createDataFrame(orders_rdd)
  298.  
  299. orders_DF.registerTempTable('orders')
  300.  
  301. orderItems_rdd = sc.textFile('/public/retail_db/order_items'). \
  302. map(lambda rec: Row(
  303. order_item_order_id = int(rec.split(',')[1]),
  304. order_item_product_id = int(rec.split(',')[2]),
  305. order_item_subtotal = float(rec.split(',')[4])
  306. ))
  307.  
  308. orderItems_DF = sqlContext.createDataFrame(orderItems_rdd)
  309.  
  310. orderItems_DF.registerTempTable('order_items')
  311.  
  312. products_rdd = sc.textFile('/public/retail_db/products'). \
  313. map(lambda rec: Row(
  314. product_id = int(rec.split(',')[0]),
  315. product_category_id = int(rec.split(',')[1]),
  316. product_name = rec.split(',')[2]
  317. ))
  318.  
  319. products_DF = sqlContext.createDataFrame(products_rdd)
  320.  
  321. products_DF.registerTempTable('products')
  322.  
  323. result = sqlContext.sql("select order_date, order_revenue, product_name, product_category_id from (select order_date, order_revenue, product_name, product_category_id, dense_rank() over(order by order_revenue desc) rnk from(select o.order_date, round(sum(oi.order_item_subtotal),2) order_revenue, p.product_name, p.product_category_id from products p join order_items oi on p.product_id = oi.order_item_product_id join orders o on oi.order_item_order_id = o.order_id where to_date(o.order_date) = '2013-07-26' group by p.product_name, p.product_category_id, o.order_date)q)m where m.rnk < 6")
  324.  
  325. sqlContext.setConf('spark.sql.shuffle.partitions','1')
  326.  
  327. result. \
  328. selectExpr("concat(order_date,':',order_revenue,':',product_name,':',product_category_id)"). \
  329. write. \
  330. text('/user/codersham/problem7/solution/')
  331.  
  332. #8 : Instructions
  333.  
  334. # List the order Items where the order_status = 'PENDING PAYMENT' order by order_id
  335.  
  336. # Data Description
  337.  
  338. # Data is available in HDFS location
  339.  
  340. # retail_db data information:
  341.  
  342. # Source directories: /public/retail_db/orders
  343. # Source delimiter: comma(",")
  344. # Source Columns - orders - order_id, order_date, order_customer_id, order_status
  345. # Output Requirements
  346.  
  347. # Target columns: order_id, order_date, order_customer_id, order_status
  348. # File Format: orc
  349. # Place the output files in the HDFS directory
  350. # /user/`whoami`/problem8/solution/
  351. # Replace `whoami` with your OS user name
  352. # End of Problem
  353.  
  354. sc.setLogLevel('ERROR')
  355.  
  356. from pyspark.sql import Row
  357.  
  358. orders_rdd = sc.textFile('/public/retail_db/orders'). \
  359. map(lambda rec: Row(
  360. order_id = int(rec.split(',')[0]),
  361. order_date = rec.split(',')[1],
  362. order_customer_id = int(rec.split(',')[2]),
  363. order_status = rec.split(',')[3])
  364. )
  365.  
  366. orders_DF = sqlContext.createDataFrame(orders_rdd)
  367.  
  368. orders_DF.registerTempTable('orders')
  369.  
  370. result = sqlContext.sql("select * from orders where order_status = 'PENDING PAYMENT' order by order_id")
  371.  
  372. result. \
  373. write. \
  374. orc('/user/codersham/problem8/solution/')
  375.  
  376. #9 : Instructions
  377.  
  378. # Remove header from h1b data
  379.  
  380. # Data Description
  381.  
  382. # h1b data with ascii null "\0" as delimiter is available in HDFS
  383.  
  384. # h1b data information:
  385.  
  386. # HDFS location: /public/h1b/h1b_data
  387. # First record is the header for the data
  388. # Output Requirements
  389.  
  390. # Remove the header from the data and save rest of the data as is
  391. # Data should be compressed using snappy algorithm
  392. # Place the H1B data in the HDFS directory
  393. # /user/`whoami`/problem9/solution/
  394. # Replace `whoami` with your OS user name
  395. # End of Problem
  396.  
  397. sc.setLogLevel('ERROR')
  398.  
  399. h1b_data_with_header = sc.textFile('/public/h1b/h1b_data')
  400.  
  401. h1b_data_only_header = sc.parallelize([h1b_data_with_header.first()])
  402.  
  403. h1b_data = h1b_data_with_header.subtract(h1b_data_only_header)
  404.  
  405. h1b_data.saveAsTextFile('/user/codersham/problem9/solution/','org.apache.hadoop.io.compress.SnappyCodec')
  406.  
  407. #10 : Instructions
  408.  
  409. # Get number of LCAs filed for each year
  410.  
  411. # Data Description
  412.  
  413. # h1b data with ascii null "\0" as delimiter is available in HDFS
  414.  
  415. # h1b data information:
  416.  
  417. # HDFS Location: /public/h1b/h1b_data
  418. # Ignore first record which is header of the data
  419. # YEAR is 8th field in the data
  420. # There are some LCAs for which YEAR is NA, ignore those records
  421. # Output Requirements
  422.  
  423. # File Format: text
  424. # Output Fields: YEAR, NUMBER_OF_LCAS
  425. # Delimiter: Ascii null "\0"
  426. # Place the output files in the HDFS directory
  427. # /user/`whoami`/problem10/solution/
  428. # Replace `whoami` with your OS user name
  429. # End of Problem
  430.  
  431. sc.setLogLevel('ERROR')
  432.  
  433. h1b_data_with_header = sc.textFile('/public/h1b/h1b_data'). \
  434. filter(lambda rec: rec.split('\0')[7] != 'NA')
  435.  
  436. h1b_data_only_header = sc.parallelize([h1b_data_with_header.first()])
  437.  
  438. h1b_data = h1b_data_with_header.subtract(h1b_data_only_header)
  439.  
  440. h1b_data_map = h1b_data.map(lambda rec: (rec.split('\0')[7],1))
  441.  
  442. h1b_data_count = h1b_data_map.reduceByKey(lambda x,y:x+y)
  443.  
  444. result = h1b_data_count.map(lambda rec: rec[0]+'\0'+str(rec[1]))
  445.  
  446. result. \
  447. coalesce(1). \
  448. saveAsTextFile('/user/codersham/problem10/solution/')
  449.  
  450.  
  451. #11: Instructions
  452.  
  453. # Get number of LCAs by status for the year 2016
  454.  
  455. # Data Description
  456.  
  457. # h1b data with ascii null "\0" as delimiter is available in HDFS
  458.  
  459. # h1b data information:
  460.  
  461. # HDFS Location: /public/h1b/h1b_data
  462. # Ignore first record which is header of the data
  463. # YEAR is 8th field in the data
  464. # STATUS is 2nd field in the data
  465. # There are some LCAs for which YEAR is NA, ignore those records
  466. # Output Requirements
  467.  
  468. # File Format: json
  469. # Output Field Names: year, status, count
  470. # Place the output files in the HDFS directory
  471. # /user/`whoami`/problem11/solution/
  472. # Replace `whoami` with your OS user name
  473. # End of Problem
  474.  
  475. sc.setLogLevel('ERROR')
  476.  
  477. h1b_data_with_header = sc.textFile('/public/h1b/h1b_data'). \
  478. filter(lambda rec: rec.split('\0')[7] != 'NA')
  479.  
  480. h1b_data_only_header = sc.parallelize([h1b_data_with_header.first()])
  481.  
  482. from pyspark.sql import Row
  483.  
  484. h1b_data = h1b_data_with_header.subtract(h1b_data_only_header). \
  485. map(lambda rec: Row(
  486. year = rec.split('\0')[7],
  487. status = rec.split('\0')[1]
  488. ))
  489.  
  490. h1b_data_DF = sqlContext.createDataFrame(h1b_data)
  491.  
  492. h1b_data_DF.registerTempTable('h1b_data')
  493.  
  494. result = sqlContext.sql("select year, status, count(*) count from h1b_data where year = '2016' group by year, status")
  495.  
  496. result. \
  497. write. \
  498. json('/user/codersham/problem11/solution', mode = 'overwrite')
  499.  
  500.  
  501. #12 : Instructions
  502.  
  503. # Get top 5 employers for year 2016 where the status is WITHDRAWN or CERTIFIED-WITHDRAWN or DENIED
  504.  
  505. # Data Description
  506.  
  507. # h1b data with ascii null "\0" as delimiter is available in HDFS
  508.  
  509. # h1b data information:
  510.  
  511. # HDFS Location: /public/h1b/h1b_data
  512. # Ignore first record which is header of the data
  513. # YEAR is 7th field in the data
  514. # STATUS is 2nd field in the data
  515. # EMPLOYER is 3rd field in the data
  516. # There are some LCAs for which YEAR is NA, ignore those records
  517. # Output Requirements
  518.  
  519. # File Format: parquet
  520. # Output Fields: employer_name, lca_count
  521. # Data needs to be in descending order by count
  522. # Place the output files in the HDFS directory
  523. # /user/`whoami`/problem12/solution/
  524. # Replace `whoami` with your OS user name
  525. # End of Problem
  526.  
  527. sc.setLogLevel('ERROR')
  528.  
  529. h1b_data_with_header = sc.textFile('/public/h1b/h1b_data'). \
  530. filter(lambda rec: rec.split('\0')[7] != 'NA')
  531.  
  532. h1b_data_only_header = sc.parallelize([h1b_data_with_header.first()])
  533.  
  534. from pyspark.sql import Row
  535.  
  536. h1b_data = h1b_data_with_header.subtract(h1b_data_only_header). \
  537. map(lambda rec: Row(
  538. year = rec.split('\0')[7],
  539. employer_name = rec.split('\0')[2],
  540. status = rec.split('\0')[1]
  541. ))
  542.  
  543. h1b_data_DF = sqlContext.createDataFrame(h1b_data)
  544.  
  545. h1b_data_DF.registerTempTable('h1b_data')
  546.  
  547. result = sqlContext.sql("select employer_name,lca_count from(select employer_name,lca_count,dense_rank() over(order by lca_count desc) rnk from (select employer_name, count(*) lca_count from h1b_data where year = '2016' and status in ('WITHDRAWN','CERTIFIED-WITHDRAWN','DENIED') group by employer_name)q)m where m.rnk < 6")
  548.  
  549. sqlContext.setConf('spark.sql.shuffle.partitions','1')
  550.  
  551. result.write.parquet('/user/codersham/problem12/solution/')
  552.  
  553.  
  554. #13 : Instructions
  555.  
  556. # Copy all h1b data from HDFS to Hive table excluding those where year is NA or prevailing_wage is NA
  557.  
  558. # Data Description
  559.  
  560. # h1b data with ascii null "\0" as delimiter is available in HDFS
  561.  
  562. # h1b data information:
  563.  
  564. # HDFS Location: /public/h1b/h1b_data_noheader
  565. # Fields:
  566. # ID, CASE_STATUS, EMPLOYER_NAME, SOC_NAME, JOB_TITLE, FULL_TIME_POSITION, PREVAILING_WAGE, YEAR, WORKSITE, LONGITUDE, LATITUDE
  567. # Ignore data where PREVAILING_WAGE is NA or YEAR is NA
  568. # PREVAILING_WAGE is 7th field
  569. # YEAR is 8th field
  570. # Number of records matching criteria: 3002373
  571. # Output Requirements
  572.  
  573. # Save it in Hive Database
  574. # Create Database: CREATE DATABASE IF NOT EXISTS `whoami`
  575. # Switch Database: USE `whoami`
  576. # Save data to hive table h1b_data
  577. # Create table command:
  578.  
  579. # CREATE TABLE h1b_data (
  580. # ID INT,
  581. # CASE_STATUS STRING,
  582. # EMPLOYER_NAME STRING,
  583. # SOC_NAME STRING,
  584. # JOB_TITLE STRING,
  585. # FULL_TIME_POSITION STRING,
  586. # PREVAILING_WAGE DOUBLE,
  587. # YEAR INT,
  588. # WORKSITE STRING,
  589. # LONGITUDE STRING,
  590. # LATITUDE STRING
  591. # )
  592.  
  593. # Replace `whoami` with your OS user name
  594. # End of Problem
  595.  
  596. from pyspark.sql import Row
  597.  
  598. h1b_data_rdd = sc.textFile('/public/h1b/h1b_data_noheader'). \
  599. filter(lambda rec: rec.split('\0')[7] != "NA"). \
  600. filter(lambda rec: rec.split('\0')[6] != "NA"). \
  601. map(lambda rec: Row(
  602. ID = int(rec.split('\0')[0]),
  603. CASE_STATUS = rec.split('\0')[1],
  604. EMPLOYER_NAME = rec.split('\0')[2],
  605. SOC_NAME = rec.split('\0')[3],
  606. JOB_TITLE = rec.split('\0')[4],
  607. FULL_TIME_POSITION = rec.split('\0')[5],
  608. PREVAILING_WAGE = float(rec.split('\0')[6]),
  609. YEAR = int(rec.split('\0')[7]),
  610. WORKSITE = rec.split('\0')[8],
  611. LONGITUDE = rec.split('\0')[9],
  612. LATITUDE = rec.split('\0')[10]
  613. ))
  614.  
  615. h1b_data_DF = sqlContext.createDataFrame(h1b_data_rdd)
  616.  
  617. h1b_data_DF. \
  618. write. \
  619. saveAsTable('codersham.h1b_data')
  620.  
  621. #14 : Instructions
  622.  
  623. # Export h1b data from hdfs to MySQL Database
  624.  
  625. # Data Description
  626.  
  627. # h1b data with ascii character "\001" as delimiter is available in HDFS
  628.  
  629. # h1b data information:
  630.  
  631. # HDFS Location: /public/h1b/h1b_data_to_be_exported
  632. # Fields:
  633. # ID, CASE_STATUS, EMPLOYER_NAME, SOC_NAME, JOB_TITLE, FULL_TIME_POSITION, PREVAILING_WAGE, YEAR, WORKSITE, LONGITUDE, LATITUDE
  634. # Number of records: 3002373
  635. # Output Requirements
  636.  
  637. # Export data to MySQL Database
  638. # MySQL database is running on ms.itversity.com
  639. # User: h1b_user
  640. # Password: itversity
  641. # Database Name: h1b_export
  642. # Table Name: h1b_data_`whoami`
  643. # Nulls are represented as: NA
  644. # After export nulls should not be stored as NA in database. It should be represented as database null
  645. # Create table command:
  646.  
  647. # CREATE TABLE h1b_data_`whoami` (
  648. # ID INT,
  649. # CASE_STATUS VARCHAR(50),
  650. # EMPLOYER_NAME VARCHAR(100),
  651. # SOC_NAME VARCHAR(100),
  652. # JOB_TITLE VARCHAR(100),
  653. # FULL_TIME_POSITION VARCHAR(50),
  654. # PREVAILING_WAGE FLOAT,
  655. # YEAR INT,
  656. # WORKSITE VARCHAR(50),
  657. # LONGITUDE VARCHAR(50),
  658. # LATITUDE VARCHAR(50));
  659.  
  660. # Replace `whoami` with your OS user name
  661. # Above create table command can be run using
  662. # Login using mysql -u h1b_user -h ms.itversity.com -p
  663. # When prompted enter password itversity
  664. # Switch to database using use h1b_export
  665. # Run above create table command by replacing `whoami` with your OS user name
  666. # End of Problem
  667.  
  668. sqoop export \
  669. --connect jdbc:mysql://ms.itversity.com/h1b_export \
  670. --username h1b_user \
  671. --password itversity \
  672. --table h1b_data_codersham \
  673. --export-dir /public/h1b/h1b_data_to_be_exported \
  674. --input-null-string 'NA' \
  675. --input-fields-terminated-by "\001"
  676.  
  677. #15 : Instructions
  678.  
  679. # Connect to the MySQL database on the itversity labs using sqoop and import data with case_status as CERTIFIED
  680.  
  681. # Data Description
  682.  
  683. # A MySQL instance is running on a remote node ms.itversity.com in the instance. You will find a table that contains 3002373 rows of h1b data
  684.  
  685. # MySQL database information:
  686.  
  687. # Installation on the node ms.itversity.com
  688. # Database name is h1b_db
  689. # Username: h1b_user
  690. # Password: itversity
  691. # Table name h1b_data
  692. # Output Requirements
  693.  
  694. # Place the h1b related data in files in HDFS directory
  695. # /user/`whoami`/problem15/solution/
  696. # Replace `whoami` with your OS user name
  697. # Use avro file format
  698. # Load only those records which have case_status as CERTIFIED completely
  699. # There are 2615623 such records
  700. # End of Problem
  701.  
  702. sqoop import \
  703. --connect jdbc:mysql://ms.itversity.com/h1b_db \
  704. --username h1b_user \
  705. --password itversity \
  706. --table h1b_data \
  707. --where "case_status = 'CERTIFIED'" \
  708. --target-dir /user/codersham/problem15/solution \
  709. --as-avrodatafile
  710.  
  711. #16 : Instructions
  712.  
  713. # Get NYSE data in ascending order by date and descending order by volume
  714.  
  715. # Data Description
  716.  
  717. # NYSE data with "," as delimiter is available in HDFS
  718.  
  719. # NYSE data information:
  720.  
  721. # HDFS location: /public/nyse
  722. # There is no header in the data
  723. # Output Requirements
  724.  
  725. # Save data back to HDFS
  726. # Column order: stockticker, transactiondate, openprice, highprice, lowprice, closeprice, volume
  727. # File Format: text
  728. # Delimiter: :
  729. # Place the sorted NYSE data in the HDFS directory
  730. # /user/`whoami`/problem16/solution/
  731. # Replace `whoami` with your OS user name
  732. # End of Problem
  733.  
  734. sc.setLogLevel('ERROR')
  735.  
  736. from pyspark.sql import Row
  737.  
  738. nyse_data_rdd = sc.textFile('/public/nyse'). \
  739. map(lambda rec: Row(
  740. stockticker = rec.split(',')[0],
  741. transactiondate = rec.split(',')[1],
  742. openprice = float(rec.split(',')[2]),
  743. highprice = float(rec.split(',')[3]),
  744. lowprice = float(rec.split(',')[4]),
  745. closeprice = float(rec.split(',')[5]),
  746. volume = long(rec.split(',')[6])
  747. ))
  748.  
  749. nyse_data_DF = sqlContext.createDataFrame(nyse_data_rdd)
  750.  
  751. from pyspark.sql.functions import col
  752.  
  753. result = nyse_data_DF. \
  754. orderBy(col('transactiondate').asc(),col('volume').desc())
  755.  
  756. sqlContext.setConf('spark.sql.shuffle.partitions','1')
  757.  
  758. result. \
  759. selectExpr("concat(stockticker,':',transactiondate,':',openprice,':',highprice,':',lowprice,':',closeprice,':',volume)"). \
  760. write. \
  761. text("/user/codersham/problem16/solution")
  762.  
  763. #17 : Instructions
  764.  
  765. # Get the stock tickers from NYSE data for which full name is missing in NYSE symbols data
  766.  
  767. # Data Description
  768.  
  769. # NYSE data with "," as delimiter is available in HDFS
  770.  
  771. # NYSE data information:
  772.  
  773. # HDFS location: /public/nyse
  774. # There is no header in the data
  775. # NYSE Symbols data with "\t" as delimiter is available in HDFS
  776.  
  777. # NYSE Symbols data information:
  778.  
  779. # HDFS location: /public/nyse_symbols
  780. # First line is header and it should be included
  781. # Output Requirements
  782.  
  783. # Get unique stock ticker for which corresponding names are missing in NYSE symbols data
  784. # Save data back to HDFS
  785. # File Format: avro
  786. # Avro dependency details:
  787. # groupId -> com.databricks, artifactId -> spark-avro_2.10, version -> 2.0.1
  788. # Place the sorted NYSE data in the HDFS directory
  789. # /user/`whoami`/problem17/solution/
  790. # Replace `whoami` with your OS user name
  791. # End of Problem
  792.  
  793. sc.setLogLevel('ERROR')
  794.  
  795. from pyspark.sql import Row
  796.  
  797. nyse_data_rdd = sc.textFile('/public/nyse'). \
  798. map(lambda rec: Row(
  799. stockticker = rec.split(',')[0],
  800. transactiondate = rec.split(',')[1],
  801. openprice = float(rec.split(',')[2]),
  802. highprice = float(rec.split(',')[3]),
  803. lowprice = float(rec.split(',')[4]),
  804. closeprice = float(rec.split(',')[5]),
  805. volume = long(rec.split(',')[6])
  806. ))
  807.  
  808. nyse_data_DF = sqlContext.createDataFrame(nyse_data_rdd)
  809.  
  810. nyse_data_DF.registerTempTable('nyse_data')
  811.  
  812. nyse_symbols_with_header = sc.textFile("/public/nyse_symbols")
  813.  
  814. nyse_symbols_only_header = sc.parallelize([nyse_symbols_with_header.first()])
  815.  
  816. nyse_symbols_rdd = nyse_symbols_with_header.subtract(nyse_symbols_only_header). \
  817. map(lambda rec: Row(
  818. symbol = rec.split('\t')[0],
  819. description = rec.split('\t')[1]
  820. ))
  821.  
  822. nyse_symbols_DF = sqlContext.createDataFrame(nyse_symbols_rdd)
  823.  
  824. nyse_symbols_DF.registerTempTable('nyse_symbols')
  825.  
  826. result = sqlContext.sql("select distinct stockticker from (select nyd.stockticker, nys.description from nyse_data nyd left outer join nyse_symbols nys on nyd.stockticker = nys.symbol)q where q.description is null")
  827.  
  828. sqlContext.setConf('spark.sql.shuffle.partitions','1')
  829.  
  830. result. \
  831. write. \
  832. format('com.databricks.spark.avro'). \
  833. save('/user/codersham/problem17/solution/')
  834.  
  835.  
  836. #18 : Instructions
  837.  
  838. # Get the name of stocks displayed along with other information
  839.  
  840. # Data Description
  841.  
  842. # NYSE data with "," as delimiter is available in HDFS
  843.  
  844. # NYSE data information:
  845.  
  846. # HDFS location: /public/nyse
  847. # There is no header in the data
  848. # NYSE Symbols data with tab character (\t) as delimiter is available in HDFS
  849.  
  850. # NYSE Symbols data information:
  851.  
  852. # HDFS location: /public/nyse_symbols
  853. # First line is header and it should be included
  854. # Output Requirements
  855.  
  856. # Get all NYSE details along with stock name if exists, if not stockname should be empty
  857. # Column Order: stockticker, stockname, transactiondate, openprice, highprice, lowprice, closeprice, volume
  858. # Delimiter: ,
  859. # File Format: text
  860. # Place the data in the HDFS directory
  861. # /user/`whoami`/problem18/solution/
  862. # Replace `whoami` with your OS user name
  863. # End of Problem
  864.  
  865. sc.setLogLevel('ERROR')
  866.  
  867. from pyspark.sql import Row
  868.  
  869. nyse_data_rdd = sc.textFile('/public/nyse'). \
  870. map(lambda rec: Row(
  871. stockticker = rec.split(',')[0],
  872. transactiondate = rec.split(',')[1],
  873. openprice = float(rec.split(',')[2]),
  874. highprice = float(rec.split(',')[3]),
  875. lowprice = float(rec.split(',')[4]),
  876. closeprice = float(rec.split(',')[5]),
  877. volume = long(rec.split(',')[6])
  878. ))
  879.  
  880. nyse_data_DF = sqlContext.createDataFrame(nyse_data_rdd)
  881.  
  882. nyse_data_DF.registerTempTable('nyse_data')
  883.  
  884. nyse_symbols_with_header = sc.textFile("/public/nyse_symbols")
  885.  
  886. nyse_symbols_only_header = sc.parallelize([nyse_symbols_with_header.first()])
  887.  
  888. nyse_symbols_rdd = nyse_symbols_with_header.subtract(nyse_symbols_only_header). \
  889. map(lambda rec: Row(
  890. symbol = rec.split('\t')[0],
  891. description = rec.split('\t')[1]
  892. ))
  893.  
  894. nyse_symbols_DF = sqlContext.createDataFrame(nyse_symbols_rdd)
  895.  
  896. nyse_symbols_DF.registerTempTable('nyse_symbols')
  897.  
  898. result = sqlContext.sql("select nyd.stockticker, nvl(nys.description,'') stockname, nyd.transactiondate, nyd.openprice, nyd.highprice, nyd.lowprice, nyd.closeprice, nyd.volume from nyse_data nyd left outer join nyse_symbols nys on nyd.stockticker = nys.symbol")
  899.  
  900. sqlContext.setConf('spark.sql.shuffle.partitions','1')
  901.  
  902. result. \
  903. selectExpr("concat(stockticker,',',stockname,',',transactiondate,',',openprice,',',highprice,',',lowprice,',',closeprice,',',volume)"). \
  904. write. \
  905. text('/user/codersham/problem18/solution/')
  906.  
  907.  
  908. #19 : Instructions
  909.  
  910. # Get number of companies who filed LCAs for each year
  911.  
  912. # Data Description
  913.  
  914. # h1b data with ascii null "\0" as delimiter is available in HDFS
  915.  
  916. # h1b data information:
  917.  
  918. # HDFS Location: /public/h1b/h1b_data_noheader
  919. # Fields:
  920. # ID, CASE_STATUS, EMPLOYER_NAME, SOC_NAME, JOB_TITLE, FULL_TIME_POSITION, PREVAILING_WAGE, YEAR, WORKSITE, LONGITUDE, LATITUDE
  921. # Use EMPLOYER_NAME as the criteria to identify the company name to get number of companies
  922. # YEAR is 8th field
  923. # There are some LCAs for which YEAR is NA, ignore those records
  924. # Output Requirements
  925.  
  926. # File Format: text
  927. # Delimiter: tab character "\t"
  928. # Output Field Order: year, lca_count
  929. # Place the output files in the HDFS directory
  930. # /user/`whoami`/problem19/solution/
  931. # Replace `whoami` with your OS user name
  932. # End of Problem
  933.  
  934. sc.setLogLevel('ERROR')
  935.  
  936. from pyspark.sql import Row
  937.  
  938. h1b_data_rdd = sc.textFile('/public/h1b/h1b_data_noheader'). \
  939. filter(lambda rec: rec.split('\0')[7] != 'NA'). \
  940. map(lambda rec: Row(year = rec.split('\0')[7],employer_name = rec.split('\0')[2]))
  941.  
  942. h1b_data_DF = sqlContext.createDataFrame(h1b_data_rdd)
  943.  
  944. h1b_data_DF.registerTempTable('h1b_data')
  945.  
  946. result = sqlContext.sql("select year, count(distinct employer_name) lca_count from h1b_data group by year")
  947.  
  948. sqlContext.setConf('spark.sql.shuffle.parititons','1')
  949.  
  950. result. \
  951. selectExpr("concat(year,'\t',lca_count)"). \
  952. write. \
  953. text('/user/codersham/problem19/solution/')
  954.  
  955. #20 : Instructions
  956.  
  957. # Connect to the MySQL database on the itversity labs using sqoop and import data with employer_name, case_status and count. Make sure data is sorted by employer_name in ascending order and by count in descending order
  958.  
  959. # Data Description
  960.  
  961. # A MySQL instance is running on a remote node ms.itversity.com in the instance. You will find a table that contains 3002373 rows of h1b data
  962.  
  963. # MySQL database information:
  964.  
  965. # Installation on the node ms.itversity.com
  966. # Database name is h1b_db
  967. # Username: h1b_user
  968. # Password: itversity
  969. # Table name h1b_data
  970. # Output Requirements
  971.  
  972. # Place the h1b related data in files in HDFS directory
  973. # /user/`whoami`/problem20/solution/
  974. # Replace `whoami` with your OS user name
  975. # Use text file format and tab (\t) as delimiter
  976. # Hint: You can use Spark with JDBC or Sqoop import with query
  977. # You might not get such hints in actual exam
  978. # Output should contain employer name, case status and count
  979. # End of Problem
  980.  
  981. sqoop import \
  982. --connect jdbc:mysql://ms.itversity.com/h1b_db \
  983. --username h1b_user \
  984. --password itversity \
  985. --query "select employer_name, case_status, count(id) count from h1b_data where \$CONDITIONS group by employer_name, case_status order by employer_name asc, count desc" \
  986. --target-dir /user/codersham/problem20/solution \
  987. --fields-terminated-by '\t' \
  988. --num-mappers 1
Add Comment
Please, Sign In to add comment