Guest User

Untitled

a guest
Dec 8th, 2018
116
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 18.99 KB | None | 0 0
  1. :'Problem 1
  2. #Connect to the MySQL database on the itversity labs using sqoop and import all of the data from the orders table into HDFS
  3. Output Requirements
  4. #Place the customer files in the HDFS directory
  5. #/user/yourusername/problem1/solution/
  6. #Replace yourusername with your OS user name
  7. #Use a text format with comma as the columnar delimiter
  8. #Load every order record completely'
  9.  
  10. sqoop import \
  11. --connect jdbc:mysql://ms.itversity.com:3306/retail_db \
  12. --username retail_user \
  13. --password itversity \
  14. --table orders \
  15. --target-dir /user/vikasgonti/itversity/problem1/solution/
  16.  
  17. :'#Problem 2
  18. #Get the customers who have not placed any orders, sorted by customer_lname and then customer_fname
  19. Output Requirements
  20. #Target Columns: customer_lname, customer_fname
  21. #Number of Files: 1
  22. #Place the output file in the HDFS directory
  23. #/user/yourusername/problem2/solution/
  24. #Replace yourusername with your OS user name
  25. #File format should be text
  26. #delimiter is (",")
  27. #Compression: Uncompressed
  28. '
  29. sqoop import \
  30. --connect jdbc:mysql://ms.itversity.com:3306/retail_db \
  31. --username retail_user \
  32. --password itversity \
  33. --target-dir /user/vikasgonti/itversity/problem2/solution/ \
  34. -m 1 \
  35. --query "select customer_lname, customer_fname from customers left outer join orders on customer_id = order_customer_id where \$CONDITIONS and order_customer_id is null order by customer_lname, customer_fname"
  36.  
  37.  
  38. :'#Problem 3
  39. #Get top 3 crime types based on number of incidents in RESIDENCE area using "Location Description"
  40. Output Requirements
  41. #Output Fields: crime_type, incident_count
  42. #Output File Format: JSON
  43. #Delimiter: N/A
  44. #Compression: No
  45. #Place the output file in the HDFS directory
  46. #/user/yourusername/problem3/solution/
  47. #Replace yourusername with your OS user name'
  48.  
  49. val crimeData = sc.textFile("/public/crime/csv")
  50. val crimeDataHeader = crimeData.first
  51. val crimeDatawithoutHeader = crimeData.filter(rec => rec!=crimeDataHeader)
  52.  
  53. crimeDatawithoutHeader.take(10).foreach(println)
  54.  
  55. val crimeRDD = crimeDatawithoutHeader.map(rec => {
  56. val t = rec.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1)
  57. (t(5),t(7))
  58. })
  59.  
  60. val crimeDF = crimeRDD.toDF("type","location")
  61. crimeDF.registerTempTable("crime");
  62. val res = sqlContext.sql("select * from (select type as crime_type, count(1) as incident_count from crime "+
  63. "where location = 'RESIDENCE' group by type order by incident_count desc) A limit 3");
  64. res.toJSON.saveAsTextFile("/user/vikasgonti/itversity/problem3/solution/")
  65.  
  66. :'#Problem 4
  67. #Convert NYSE data into parquet
  68. Output Requirements
  69. #Column Names: stockticker, transactiondate, openprice, highprice, lowprice, closeprice, volume
  70. #Convert file format to parquet
  71. #Place the output file in the HDFS directory
  72. #/user/yourusername/problem4/solution/
  73. #Replace yourusername with your OS user name'
  74.  
  75. val nyseRDD = sc.textFile("/user/vikasgonti/data/nyse")
  76. val nyseDF = nyseRDD.map(rec => {
  77. val t = rec.split(",")
  78. (t(0),t(1),t(2),t(3),t(4),t(5),t(6))
  79. }).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
  80.  
  81. nyseDF.write.parquet("/user/vikasgonti/itversity/problem4/solution/")
  82.  
  83. :'#problem 5
  84. #Get word count for the input data using space as delimite
  85. Output Requirements
  86. #Output File format: Avro
  87. #Output fields: word, count
  88. #Compression: Uncompressed
  89. #Place the customer files in the HDFS directory
  90. #/user/yourusername/problem5/solution/
  91. #Replace yourusername with your OS user name'
  92.  
  93. spark-shell --master yarn \
  94. --conf spark.ui.port=12456 \
  95. --num-executors 10 \
  96. --executor-memory 3G \
  97. --executor-cores 2 \
  98. --packages com.databricks:spark-avro_2.10:2.0.1
  99.  
  100. val wordsRDD = sc.textFile("/public/randomtextwriter")
  101. val wordsFlat = wordsRDD.flatMap(rec => rec.split(" "))
  102. val wordsMap = wordsFlat.map(rec => (rec,1))
  103. val wordsCount = wordsMap.reduceByKey((t,v) => t+v,8)
  104. val wordsDF = wordsCount.toDF("word","count")
  105. wordsDF.write.avro("/user/vikasgonti/itversity/problem5/solution/")
  106.  
  107.  
  108. :'#problem 6
  109. #Get total number of orders for each customer where the cutomer_state = 'TX'
  110. Output Requirements
  111. #Output Fields: customer_fname, customer_lname, order_count
  112. #File Format: text
  113. #Delimiter: Tab character (\t)
  114. #Place the result file in the HDFS directory
  115. #/user/yourusername/problem6/solution/
  116. #Replace yourusername with your OS user name'
  117.  
  118. val orders = sc.textFile("/public/retail_db/orders")
  119. val customers = sc.textFile("/public/retail_db/customers")
  120. val ordersDF = orders.map(rec => {
  121. val t = rec.split(",")
  122. (t(0),t(2))
  123. }).toDF("order_id","order_customer_id")
  124.  
  125. val customersDF = customers.map(rec => {
  126. val t = rec.split(",")
  127. (t(0),t(1),t(2), t(7))
  128. }).toDF("customer_id","customer_fname","customer_lname","customer_state")
  129.  
  130. ordersDF.registerTempTable("orders")
  131. customersDF.registerTempTable("customers")
  132. val res = sqlContext.sql("select customer_fname, customer_lname, count(order_id) order_count "+
  133. "from customers, orders where customer_id = order_customer_id and customer_state = 'TX' "+
  134. "group by customer_fname, customer_lname ")
  135.  
  136. res.map(rec => rec.mkString("\t")).saveAsTextFile("/user/vikasgonti/itversity/problem6/solution/")
  137.  
  138.  
  139.  
  140. :'#problem 7
  141. #List the names of the Top 5 products by revenue ordered on '2013-07-26'. Revenue is considered only for COMPLETE and CLOSED orders.
  142. Output Requirements
  143. #Target Columns: order_date, order_revenue, product_name, product_category_id
  144. #Data has to be sorted in descending order by order_revenue
  145. #File Format: text
  146. #Delimiter: colon (:)
  147. #Place the output file in the HDFS directory
  148. #/user/yourusername/problem7/solution/
  149. #Replace yourusername with your OS user name
  150. '
  151. val orders = sc.textFile("/public/retail_db/orders")
  152. val orderItems = sc.textFile("/public/retail_db/order_items")
  153. val products = sc.textFile("/public/retail_db/products")
  154.  
  155. val ordersDF = orders.map(rec => {
  156. val t = rec.split(",")
  157. (t(0),t(1).split(" ")(0), t(3))
  158. }).toDF("order_id","order_date","order_status")
  159.  
  160. val orderItemsDF = orderItems.map(rec => {
  161. val t = rec.split(",")
  162. (t(1),t(2), t(4))
  163. }).toDF("order_item_order_id","order_item_product_id","order_item_subtotal")
  164.  
  165. val productsDF = products.map(rec => {
  166. val t = rec.split(",")
  167. (t(0),t(1),t(2))
  168. }).toDF("product_id","product_category_id","product_name")
  169.  
  170. ordersDF.registerTempTable("orders")
  171. orderItemsDF.registerTempTable("orderItems")
  172. productsDF.registerTempTable("products")
  173.  
  174. val res = sqlContext.sql("select DISTINCT order_date, round(sum(order_item_subtotal) over (partition by product_id), 2) order_revenue, "+
  175. "product_name,product_category_id from orders, products, orderItems where order_id = order_item_order_id "+
  176. "and order_item_product_id = product_id and order_date ='2013-07-26' and order_status IN ('COMPLETE','CLOSED') "+
  177. "order by order_revenue desc limit 5")
  178. res.map(rec => rec.mkString(":")).saveAsTextFile("/user/vikasgonti/itversity/problem7/solution/")
  179.  
  180. :'#problem 8
  181. #List the order Items where the order_status = PENDING PAYMENT order by order_id
  182. Output Requirements
  183. #Target columns: order_id, order_date, order_customer_id, order_status
  184. #File Format: orc
  185. #Place the output files in the HDFS directory
  186. #/user/yourusername/problem8/solution/
  187. #Replace yourusername with your OS user name'
  188.  
  189. val ordersRDD = sc.textFile("/public/retail_db/orders")
  190. val ordersDF = orders.map(rec => {
  191. val t = rec.split(",")
  192. (t(0).toInt,t(1), t(2), t(3))
  193. }).toDF("order_id","order_date","order_customer_id","order_status")
  194. val res = ordersDF.filter("order_status = 'PENDING_PAYMENT'").orderBy("order_id")
  195. res.write.orc("/user/vikasgonti/itversity/problem8/solution/")
  196.  
  197. :'#problem 9
  198. #Remove header from h1b data
  199. Output Requirements
  200. #Remove the header from the data and save rest of the data as is
  201. #Data should be compressed using snappy algorithm
  202. #Place the H1B data in the HDFS directory
  203. #/user/yourusername/problem9/solution/
  204. #Replace yourusername with your OS user name
  205. '
  206. val h1bdata = sc.textFile("/public/h1b/h1b_data")
  207. val h1bHeader = h1bdata.first
  208. val h1bdatawithoutheader = h1bdata.filter(rec => rec!=h1bHeader)
  209. h1bdatawithoutheader.saveAsTextFile("/user/vikasgonti/itversity/problem9/solution/",classOf[org.apache.hadoop.io.compress.SnappyCodec])
  210.  
  211. :'#problem 10
  212. #Get number of LCAs filed for each year
  213. Output Requirements
  214. #File Format: text
  215. #Output Fields: YEAR, NUMBER_OF_LCAS
  216. #Delimiter: Ascii null "\0"
  217. #Place the output files in the HDFS directory
  218. #/user/yourusername/problem10/solution/
  219. #Replace yourusername with your OS user name'
  220. # ID CASE_STATUS EMPLOYER_NAME SOC_NAME JOB_TITLE FULL_TIME_POSITION PREVAILING_WAGE YEAR WORKSITE lon lat
  221.  
  222. val h1bDF = h1bdatawithoutheader.map(rec => {
  223. val t = rec.split("\0")
  224. (t(7))
  225. }).toDF("YEAR")
  226. h1bDF.registerTempTable("h1bdata")
  227. val res = sqlContext.sql("select YEAR, count(1) as NUMBER_OF_LCAS from h1bdata where year != 'NA' group by year")
  228. res.map(rec => rec.mkString("\0")).saveAsTextFile("/user/vikasgonti/itversity/problem10/solution/")
  229.  
  230. :'#problem 11
  231. #Get number of LCAs by status for the year 2016
  232. Output Requirements
  233. #File Format: json
  234. #Output Field Names: year, status, count
  235. #Place the output files in the HDFS directory
  236. #/user/yourusername/problem11/solution/
  237. #Replace yourusername with your OS user name'
  238.  
  239. val h1bDF = h1bdatawithoutheader.map(rec => {
  240. val t = rec.split("\0")
  241. (t(1),t(7))
  242. }).toDF("Status", "Year")
  243. h1bDF.registerTempTable("h1bdata")
  244. val res = sqlContext.sql("select Year, Status, Count(1) as count from h1bdata where year != 'NA' and year = 2016 group by year, status")
  245. res.toJSON.saveAsTextFile("/user/vikasgonti/itversity/problem11/solution/")
  246.  
  247. :'#problem 12
  248. #Get top 5 employers for year 2016 where the status is WITHDRAWN or CERTIFIED-WITHDRAWN or DENIED
  249. Output Requirements
  250. #File Format: parquet
  251. #Output Fields: employer_name, lca_count
  252. #Data needs to be in descending order by count
  253. #Place the output files in the HDFS directory
  254. #/user/yourusername/problem12/solution/
  255. #Replace yourusername with your OS user name'
  256.  
  257. val h1bDF = h1bdatawithoutheader.map(rec => {
  258. val t = rec.split("\0")
  259. (t(1),t(2),t(7))
  260. }).toDF("Status","Employer","Year")
  261. h1bDF.registerTempTable("h1bdata")
  262. val res = sqlContext.sql("select Employer employer_name, count(1) lca_count from h1bdata where year != 'NA' and year = 2016 "+
  263. "and status IN ('WITHDRAWN','CERTIFIED-WITHDRAWN','DENIED') group by Employer order by lca_count desc limit 5")
  264. res.write.parquet("/user/vikasgonti/itversity/problem12/solution/")
  265.  
  266. :'#problem 13
  267. #Copy all h1b data from HDFS to Hive table excluding those where year is NA or prevailing_wage is NA
  268. Output Requirements
  269. #Save it in Hive Database
  270. #Create Database: CREATE DATABASE IF NOT EXISTS yourusername
  271. #Switch Database: USE yourusername
  272. #Save data to hive table h1b_data
  273. #Create table command:
  274.  
  275. CREATE TABLE h1b_data (
  276. ID INT,
  277. CASE_STATUS STRING,
  278. EMPLOYER_NAME STRING,
  279. SOC_NAME STRING,
  280. JOB_TITLE STRING,
  281. FULL_TIME_POSITION STRING,
  282. PREVAILING_WAGE DOUBLE,
  283. YEAR INT,
  284. WORKSITE STRING,
  285. LONGITUDE STRING,
  286. LATITUDE STRING
  287. )
  288.  
  289. Replace yourusername with your OS user name'
  290.  
  291. val h1bdatawithoutheader = sc.textFile("/public/h1b/h1b_data_noheader")
  292. val h1bDF = h1bdatawithoutheader.map(rec => {
  293. val t = rec.split("\0")
  294. (t(0),t(1),t(2),t(3),t(4),t(5),t(6),t(7),t(8),t(9),t(10))
  295. }).toDF("ID","CASE_STATUS","EMPLOYER_NAME","SOC_NAME","JOB_TITLE","FULL_TIME_POSITION","PREVAILING_WAGE","YEAR","WORKSITE","LONGITUDE","LATITUDE")
  296.  
  297. sqlContext.sql("use vghivedatabase")
  298. sqlContext.sql("show tables").show
  299. h1bDF.registerTempTable("h1bdata")
  300.  
  301. sqlContext.sql("insert into h1b_data select * from h1bdata where year != 'NA' and PREVAILING_WAGE!='NA'")
  302.  
  303.  
  304. :'#problem 14
  305. #Export h1b data from hdfs to MySQL Database
  306. Output Requirements
  307. #Export data to MySQL Database
  308. #MySQL database is running on ms.itversity.com
  309. #User: h1b_user
  310. #Password: itversity
  311. Database Name: h1b_export
  312. Table Name: h1b_data_yourusername
  313. Nulls are represented as: NA
  314. After export nulls should not be stored as NA in database. It should be represented as database null
  315. Create table command:
  316.  
  317. CREATE TABLE h1b_data_yourusername (
  318. ID INT,
  319. CASE_STATUS VARCHAR(50),
  320. EMPLOYER_NAME VARCHAR(100),
  321. SOC_NAME VARCHAR(100),
  322. JOB_TITLE VARCHAR(100),
  323. FULL_TIME_POSITION VARCHAR(50),
  324. PREVAILING_WAGE FLOAT,
  325. YEAR INT,
  326. WORKSITE VARCHAR(50),
  327. LONGITUDE VARCHAR(50),
  328. LATITUDE VARCHAR(50));
  329.  
  330. #Replace yourusername with your OS user name
  331. #Above create table command can be run using
  332. #Login using mysql -u h1b_user -h ms.itversity.com -p
  333. #When prompted enter password itversity
  334. #Switch to database using use h1b_export
  335. #Run above create table command by replacing yourusername with your OS user name'
  336.  
  337. sqoop export \
  338. --connect jdbc:mysql://ms.itversity.com:3306/h1b_export \
  339. --username h1b_user \
  340. --password itversity \
  341. --table h1b_data_vg \
  342. --export-dir /public/h1b/h1b_data_to_be_exported \
  343. --input-fields-terminated-by '\001' \
  344. --input-null-string 'NA'
  345.  
  346. :'#problem 15
  347. #Connect to the MySQL database on the itversity labs using sqoop and import data with case_status as CERTIFIED
  348. Output Requirements
  349. #Place the h1b related data in files in HDFS directory
  350. #/user/yourusername/problem15/solution/
  351. #Replace yourusername with your OS user name
  352. #Use avro file format
  353. #Load only those records which have case_status as CERTIFIED completely
  354. #There are 2615623 such records'
  355.  
  356. sqoop import \
  357. --connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
  358. --username h1b_user \
  359. --password itversity \
  360. --table h1b_data \
  361. --where "CASE_STATUS = 'CERTIFIED'" \
  362. --target-dir /user/vikasgonti/itversity/problem15/solution/ \
  363. --as-avrodatafile
  364.  
  365. :'#problem 16
  366. #Get NYSE data in ascending order by date and descending order by volume
  367. Output Requirements
  368. #Save data back to HDFS
  369. #Column order: stockticker, transactiondate, openprice, highprice, lowprice, closeprice, volume
  370. #File Format: text
  371. #Delimiter: :
  372. #Place the sorted NYSE data in the HDFS directory
  373. #/user/yourusername/problem16/solution/
  374. #Replace yourusername with your OS user name'
  375.  
  376. val nyseRDD = sc.textFile("/public/nyse")
  377. val nyseDF = nyseRDD.map(rec => {
  378. val t = rec.split(",")
  379. (t(0).toString,t(1).toInt,t(2).toFloat,t(3).toFloat,t(4).toFloat,t(5).toFloat,t(6).toInt)
  380. }).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
  381.  
  382. val res = nyseDF.orderBy(col("transactiondate"),col("volume").desc)
  383. res.map(rec => rec.mkString(":")).saveAsTextFile("/user/vikasgonti/itversity/problem16/solution/")
  384.  
  385. :'#problem 17
  386. #Get the stock tickers from NYSE data for which full name is missing in NYSE symbols data
  387. Output Requirements
  388. #Get unique stock ticker for which corresponding names are missing in NYSE symbols data
  389. #Save data back to HDFS
  390. #File Format: avro
  391. #Avro dependency details:
  392. #groupId -> com.databricks, artifactId -> spark-avro_2.10, version -> 2.0.1
  393. #Place the sorted NYSE data in the HDFS directory
  394. #/user/yourusername/problem17/solution/
  395. #Replace yourusername with your OS user name'
  396.  
  397. val nyseRDD = sc.textFile("/public/nyse")
  398. val nyseDF = nyseRDD.map(rec => {
  399. val t = rec.split(",")
  400. (t(0).toString)
  401. }).toDF("stockticker")
  402. nyseDF.registerTempTable("nyse")
  403.  
  404. val nysesymRDD = sc.textFile("/public/nyse_symbols")
  405. val first = nysesymRDD.first
  406. val symDataDF = nysesymRDD.filter(rec => rec!= first).map(rec => {
  407. val t = rec.split("\t")
  408. (t(0).toString)
  409. }).toDF("stocksymbol")
  410. symDataDF.registerTempTable("symData")
  411.  
  412. val res = sqlContext.sql("select DISTINCT stockticker as Symbol from nyse n "+
  413. "left outer join symData s on n.stockticker = s.stocksymbol and s.stocksymbol is null")
  414. import com.databricks.spark.avro._
  415. res.write.avro("/user/vikasgonti/itversity/problem17/solution/")
  416.  
  417.  
  418. :'#problem 18
  419. #Get the name of stocks displayed along with other information
  420. Output Requirements
  421. #Get all NYSE details along with stock name if exists, if not stockname should be empty
  422. #Column Order: stockticker, stockname, transactiondate, openprice, highprice, lowprice, closeprice, volume
  423. #Delimiter: ,
  424. #File Format: text
  425. #Place the data in the HDFS directory
  426. #/user/yourusername/problem18/solution/
  427. #Replace yourusername with your OS user name'
  428.  
  429. val nyseRDD = sc.textFile("/public/nyse")
  430. val nyseDF = nyseRDD.map(rec => {
  431. val t = rec.split(",")
  432. (t(0).toString,t(1).toInt,t(2).toFloat,t(3).toFloat,t(4).toFloat,t(5).toFloat,t(6).toInt)
  433. }).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
  434. nyseDF.registerTempTable("nyse")
  435.  
  436. val nysesymRDD = sc.textFile("/public/nyse_symbols")
  437. val first = nysesymRDD.first
  438. val symDataDF = nysesymRDD.filter(rec => rec!= first).map(rec => {
  439. val t = rec.split("\t")
  440. (t(0).toString, t(1).toString)
  441. }).toDF("stocksymbol", "stockname")
  442. symDataDF.registerTempTable("symData")
  443.  
  444.  
  445. val res = sqlContext.sql("select stockticker, nvl(stockname,'') stockname, transactiondate, openprice, "+
  446. "highprice, lowprice, closeprice, volume "+
  447. "from nyse n left outer join symData s on n.stockticker = s.stocksymbol")
  448.  
  449. res.map(rec => rec.mkString(",")).saveAsTextFile("/user/vikasgonti/itversity/problem18/solution/")
  450.  
  451. :'#problem 19
  452. #Get number of companies who filed LCAs for each year
  453. Output Requirements
  454. #File Format: text
  455. #Delimiter: tab character "\t"
  456. #Output Field Order: year, lca_count
  457. #Place the output files in the HDFS directory
  458. #/user/yourusername/problem19/solution/
  459. #Replace yourusername with your OS user name'
  460.  
  461. val h1bdata = sc.textFile("/public/h1b/h1b_data_noheader")
  462. val h1bDF = h1bdata.map(rec => {
  463. val t = rec.split("\0")
  464. (t(2),t(7))
  465. }).toDF("EMPLOYER_NAME","YEAR")
  466. h1bDF.registerTempTable("h1bdata")
  467. val res = sqlContext.sql("select YEAR, count(EMPLOYER_NAME) as lca_count from h1bdata where year != 'NA' group by year")
  468. res.map(rec => rec.mkString("\t")).saveAsTextFile("/user/vikasgonti/itversity/problem19/solution/")
  469.  
  470. :'#problem 20
  471. #using sqoop and import data with employer_name, case_status and count.
  472. #Make sure data is sorted by employer_name in ascending order and by count in descending order
  473. Output Requirements
  474. #Place the h1b related data in files in HDFS directory
  475. #/user/yourusername/problem20/solution/
  476. #Replace yourusername with your OS user name
  477. #Use text file format and tab (\t) as delimiter
  478. #Hint: You can use Spark with JDBC or Sqoop import with query
  479. #You might not get such hints in actual exam
  480. #Output should contain employer name, case status and count
  481. '
  482. sqoop eval \
  483. --connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
  484. --username h1b_user \
  485. --password itversity \
  486. --query "select count(*) from (select EMPLOYER_NAME, CASE_STATUS, count(1) as count from h1b_data group by EMPLOYER_NAME, CASE_STATUS) A"
  487.  
  488. sqoop import \
  489. -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
  490. --connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
  491. --username h1b_user \
  492. --password itversity \
  493. --query "select EMPLOYER_NAME, CASE_STATUS, count(1) as count from h1b_data where \$CONDITIONS group by EMPLOYER_NAME, CASE_STATUS order by EMPLOYER_NAME, count desc" \
  494. --target-dir /user/vikasgonti/itversity/problem20/solution/ \
  495. --as-textfile \
  496. --fields-terminated-by '\t' \
  497. --split-by case_status
Add Comment
Please, Sign In to add comment