Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- :'Problem 1
- #Connect to the MySQL database on the itversity labs using sqoop and import all of the data from the orders table into HDFS
- Output Requirements
- #Place the customer files in the HDFS directory
- #/user/yourusername/problem1/solution/
- #Replace yourusername with your OS user name
- #Use a text format with comma as the columnar delimiter
- #Load every order record completely'
- sqoop import \
- --connect jdbc:mysql://ms.itversity.com:3306/retail_db \
- --username retail_user \
- --password itversity \
- --table orders \
- --target-dir /user/vikasgonti/itversity/problem1/solution/
- :'#Problem 2
- #Get the customers who have not placed any orders, sorted by customer_lname and then customer_fname
- Output Requirements
- #Target Columns: customer_lname, customer_fname
- #Number of Files: 1
- #Place the output file in the HDFS directory
- #/user/yourusername/problem2/solution/
- #Replace yourusername with your OS user name
- #File format should be text
- #delimiter is (",")
- #Compression: Uncompressed
- '
- sqoop import \
- --connect jdbc:mysql://ms.itversity.com:3306/retail_db \
- --username retail_user \
- --password itversity \
- --target-dir /user/vikasgonti/itversity/problem2/solution/ \
- -m 1 \
- --query "select customer_lname, customer_fname from customers left outer join orders on customer_id = order_customer_id where \$CONDITIONS and order_customer_id is null order by customer_lname, customer_fname"
- :'#Problem 3
- #Get top 3 crime types based on number of incidents in RESIDENCE area using "Location Description"
- Output Requirements
- #Output Fields: crime_type, incident_count
- #Output File Format: JSON
- #Delimiter: N/A
- #Compression: No
- #Place the output file in the HDFS directory
- #/user/yourusername/problem3/solution/
- #Replace yourusername with your OS user name'
- val crimeData = sc.textFile("/public/crime/csv")
- val crimeDataHeader = crimeData.first
- val crimeDatawithoutHeader = crimeData.filter(rec => rec!=crimeDataHeader)
- crimeDatawithoutHeader.take(10).foreach(println)
- val crimeRDD = crimeDatawithoutHeader.map(rec => {
- val t = rec.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1)
- (t(5),t(7))
- })
- val crimeDF = crimeRDD.toDF("type","location")
- crimeDF.registerTempTable("crime");
- val res = sqlContext.sql("select * from (select type as crime_type, count(1) as incident_count from crime "+
- "where location = 'RESIDENCE' group by type order by incident_count desc) A limit 3");
- res.toJSON.saveAsTextFile("/user/vikasgonti/itversity/problem3/solution/")
- :'#Problem 4
- #Convert NYSE data into parquet
- Output Requirements
- #Column Names: stockticker, transactiondate, openprice, highprice, lowprice, closeprice, volume
- #Convert file format to parquet
- #Place the output file in the HDFS directory
- #/user/yourusername/problem4/solution/
- #Replace yourusername with your OS user name'
- val nyseRDD = sc.textFile("/user/vikasgonti/data/nyse")
- val nyseDF = nyseRDD.map(rec => {
- val t = rec.split(",")
- (t(0),t(1),t(2),t(3),t(4),t(5),t(6))
- }).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
- nyseDF.write.parquet("/user/vikasgonti/itversity/problem4/solution/")
- :'#problem 5
- #Get word count for the input data using space as delimite
- Output Requirements
- #Output File format: Avro
- #Output fields: word, count
- #Compression: Uncompressed
- #Place the customer files in the HDFS directory
- #/user/yourusername/problem5/solution/
- #Replace yourusername with your OS user name'
- spark-shell --master yarn \
- --conf spark.ui.port=12456 \
- --num-executors 10 \
- --executor-memory 3G \
- --executor-cores 2 \
- --packages com.databricks:spark-avro_2.10:2.0.1
- val wordsRDD = sc.textFile("/public/randomtextwriter")
- val wordsFlat = wordsRDD.flatMap(rec => rec.split(" "))
- val wordsMap = wordsFlat.map(rec => (rec,1))
- val wordsCount = wordsMap.reduceByKey((t,v) => t+v,8)
- val wordsDF = wordsCount.toDF("word","count")
- wordsDF.write.avro("/user/vikasgonti/itversity/problem5/solution/")
- :'#problem 6
- #Get total number of orders for each customer where the cutomer_state = 'TX'
- Output Requirements
- #Output Fields: customer_fname, customer_lname, order_count
- #File Format: text
- #Delimiter: Tab character (\t)
- #Place the result file in the HDFS directory
- #/user/yourusername/problem6/solution/
- #Replace yourusername with your OS user name'
- val orders = sc.textFile("/public/retail_db/orders")
- val customers = sc.textFile("/public/retail_db/customers")
- val ordersDF = orders.map(rec => {
- val t = rec.split(",")
- (t(0),t(2))
- }).toDF("order_id","order_customer_id")
- val customersDF = customers.map(rec => {
- val t = rec.split(",")
- (t(0),t(1),t(2), t(7))
- }).toDF("customer_id","customer_fname","customer_lname","customer_state")
- ordersDF.registerTempTable("orders")
- customersDF.registerTempTable("customers")
- val res = sqlContext.sql("select customer_fname, customer_lname, count(order_id) order_count "+
- "from customers, orders where customer_id = order_customer_id and customer_state = 'TX' "+
- "group by customer_fname, customer_lname ")
- res.map(rec => rec.mkString("\t")).saveAsTextFile("/user/vikasgonti/itversity/problem6/solution/")
- :'#problem 7
- #List the names of the Top 5 products by revenue ordered on '2013-07-26'. Revenue is considered only for COMPLETE and CLOSED orders.
- Output Requirements
- #Target Columns: order_date, order_revenue, product_name, product_category_id
- #Data has to be sorted in descending order by order_revenue
- #File Format: text
- #Delimiter: colon (:)
- #Place the output file in the HDFS directory
- #/user/yourusername/problem7/solution/
- #Replace yourusername with your OS user name
- '
- val orders = sc.textFile("/public/retail_db/orders")
- val orderItems = sc.textFile("/public/retail_db/order_items")
- val products = sc.textFile("/public/retail_db/products")
- val ordersDF = orders.map(rec => {
- val t = rec.split(",")
- (t(0),t(1).split(" ")(0), t(3))
- }).toDF("order_id","order_date","order_status")
- val orderItemsDF = orderItems.map(rec => {
- val t = rec.split(",")
- (t(1),t(2), t(4))
- }).toDF("order_item_order_id","order_item_product_id","order_item_subtotal")
- val productsDF = products.map(rec => {
- val t = rec.split(",")
- (t(0),t(1),t(2))
- }).toDF("product_id","product_category_id","product_name")
- ordersDF.registerTempTable("orders")
- orderItemsDF.registerTempTable("orderItems")
- productsDF.registerTempTable("products")
- val res = sqlContext.sql("select DISTINCT order_date, round(sum(order_item_subtotal) over (partition by product_id), 2) order_revenue, "+
- "product_name,product_category_id from orders, products, orderItems where order_id = order_item_order_id "+
- "and order_item_product_id = product_id and order_date ='2013-07-26' and order_status IN ('COMPLETE','CLOSED') "+
- "order by order_revenue desc limit 5")
- res.map(rec => rec.mkString(":")).saveAsTextFile("/user/vikasgonti/itversity/problem7/solution/")
- :'#problem 8
- #List the order Items where the order_status = PENDING PAYMENT order by order_id
- Output Requirements
- #Target columns: order_id, order_date, order_customer_id, order_status
- #File Format: orc
- #Place the output files in the HDFS directory
- #/user/yourusername/problem8/solution/
- #Replace yourusername with your OS user name'
- val ordersRDD = sc.textFile("/public/retail_db/orders")
- val ordersDF = orders.map(rec => {
- val t = rec.split(",")
- (t(0).toInt,t(1), t(2), t(3))
- }).toDF("order_id","order_date","order_customer_id","order_status")
- val res = ordersDF.filter("order_status = 'PENDING_PAYMENT'").orderBy("order_id")
- res.write.orc("/user/vikasgonti/itversity/problem8/solution/")
- :'#problem 9
- #Remove header from h1b data
- Output Requirements
- #Remove the header from the data and save rest of the data as is
- #Data should be compressed using snappy algorithm
- #Place the H1B data in the HDFS directory
- #/user/yourusername/problem9/solution/
- #Replace yourusername with your OS user name
- '
- val h1bdata = sc.textFile("/public/h1b/h1b_data")
- val h1bHeader = h1bdata.first
- val h1bdatawithoutheader = h1bdata.filter(rec => rec!=h1bHeader)
- h1bdatawithoutheader.saveAsTextFile("/user/vikasgonti/itversity/problem9/solution/",classOf[org.apache.hadoop.io.compress.SnappyCodec])
- :'#problem 10
- #Get number of LCAs filed for each year
- Output Requirements
- #File Format: text
- #Output Fields: YEAR, NUMBER_OF_LCAS
- #Delimiter: Ascii null "\0"
- #Place the output files in the HDFS directory
- #/user/yourusername/problem10/solution/
- #Replace yourusername with your OS user name'
- # ID CASE_STATUS EMPLOYER_NAME SOC_NAME JOB_TITLE FULL_TIME_POSITION PREVAILING_WAGE YEAR WORKSITE lon lat
- val h1bDF = h1bdatawithoutheader.map(rec => {
- val t = rec.split("\0")
- (t(7))
- }).toDF("YEAR")
- h1bDF.registerTempTable("h1bdata")
- val res = sqlContext.sql("select YEAR, count(1) as NUMBER_OF_LCAS from h1bdata where year != 'NA' group by year")
- res.map(rec => rec.mkString("\0")).saveAsTextFile("/user/vikasgonti/itversity/problem10/solution/")
- :'#problem 11
- #Get number of LCAs by status for the year 2016
- Output Requirements
- #File Format: json
- #Output Field Names: year, status, count
- #Place the output files in the HDFS directory
- #/user/yourusername/problem11/solution/
- #Replace yourusername with your OS user name'
- val h1bDF = h1bdatawithoutheader.map(rec => {
- val t = rec.split("\0")
- (t(1),t(7))
- }).toDF("Status", "Year")
- h1bDF.registerTempTable("h1bdata")
- val res = sqlContext.sql("select Year, Status, Count(1) as count from h1bdata where year != 'NA' and year = 2016 group by year, status")
- res.toJSON.saveAsTextFile("/user/vikasgonti/itversity/problem11/solution/")
- :'#problem 12
- #Get top 5 employers for year 2016 where the status is WITHDRAWN or CERTIFIED-WITHDRAWN or DENIED
- Output Requirements
- #File Format: parquet
- #Output Fields: employer_name, lca_count
- #Data needs to be in descending order by count
- #Place the output files in the HDFS directory
- #/user/yourusername/problem12/solution/
- #Replace yourusername with your OS user name'
- val h1bDF = h1bdatawithoutheader.map(rec => {
- val t = rec.split("\0")
- (t(1),t(2),t(7))
- }).toDF("Status","Employer","Year")
- h1bDF.registerTempTable("h1bdata")
- val res = sqlContext.sql("select Employer employer_name, count(1) lca_count from h1bdata where year != 'NA' and year = 2016 "+
- "and status IN ('WITHDRAWN','CERTIFIED-WITHDRAWN','DENIED') group by Employer order by lca_count desc limit 5")
- res.write.parquet("/user/vikasgonti/itversity/problem12/solution/")
- :'#problem 13
- #Copy all h1b data from HDFS to Hive table excluding those where year is NA or prevailing_wage is NA
- Output Requirements
- #Save it in Hive Database
- #Create Database: CREATE DATABASE IF NOT EXISTS yourusername
- #Switch Database: USE yourusername
- #Save data to hive table h1b_data
- #Create table command:
- CREATE TABLE h1b_data (
- ID INT,
- CASE_STATUS STRING,
- EMPLOYER_NAME STRING,
- SOC_NAME STRING,
- JOB_TITLE STRING,
- FULL_TIME_POSITION STRING,
- PREVAILING_WAGE DOUBLE,
- YEAR INT,
- WORKSITE STRING,
- LONGITUDE STRING,
- LATITUDE STRING
- )
- Replace yourusername with your OS user name'
- val h1bdatawithoutheader = sc.textFile("/public/h1b/h1b_data_noheader")
- val h1bDF = h1bdatawithoutheader.map(rec => {
- val t = rec.split("\0")
- (t(0),t(1),t(2),t(3),t(4),t(5),t(6),t(7),t(8),t(9),t(10))
- }).toDF("ID","CASE_STATUS","EMPLOYER_NAME","SOC_NAME","JOB_TITLE","FULL_TIME_POSITION","PREVAILING_WAGE","YEAR","WORKSITE","LONGITUDE","LATITUDE")
- sqlContext.sql("use vghivedatabase")
- sqlContext.sql("show tables").show
- h1bDF.registerTempTable("h1bdata")
- sqlContext.sql("insert into h1b_data select * from h1bdata where year != 'NA' and PREVAILING_WAGE!='NA'")
- :'#problem 14
- #Export h1b data from hdfs to MySQL Database
- Output Requirements
- #Export data to MySQL Database
- #MySQL database is running on ms.itversity.com
- #User: h1b_user
- #Password: itversity
- Database Name: h1b_export
- Table Name: h1b_data_yourusername
- Nulls are represented as: NA
- After export nulls should not be stored as NA in database. It should be represented as database null
- Create table command:
- CREATE TABLE h1b_data_yourusername (
- ID INT,
- CASE_STATUS VARCHAR(50),
- EMPLOYER_NAME VARCHAR(100),
- SOC_NAME VARCHAR(100),
- JOB_TITLE VARCHAR(100),
- FULL_TIME_POSITION VARCHAR(50),
- PREVAILING_WAGE FLOAT,
- YEAR INT,
- WORKSITE VARCHAR(50),
- LONGITUDE VARCHAR(50),
- LATITUDE VARCHAR(50));
- #Replace yourusername with your OS user name
- #Above create table command can be run using
- #Login using mysql -u h1b_user -h ms.itversity.com -p
- #When prompted enter password itversity
- #Switch to database using use h1b_export
- #Run above create table command by replacing yourusername with your OS user name'
- sqoop export \
- --connect jdbc:mysql://ms.itversity.com:3306/h1b_export \
- --username h1b_user \
- --password itversity \
- --table h1b_data_vg \
- --export-dir /public/h1b/h1b_data_to_be_exported \
- --input-fields-terminated-by '\001' \
- --input-null-string 'NA'
- :'#problem 15
- #Connect to the MySQL database on the itversity labs using sqoop and import data with case_status as CERTIFIED
- Output Requirements
- #Place the h1b related data in files in HDFS directory
- #/user/`whoami`/problem15/solution/
- #Replace `whoami` with your OS user name
- #Use avro file format
- #Load only those records which have case_status as CERTIFIED completely
- #There are 2615623 such records'
- sqoop import \
- --connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
- --username h1b_user \
- --password itversity \
- --table h1b_data \
- --where "CASE_STATUS = 'CERTIFIED'" \
- --target-dir /user/vikasgonti/itversity/problem15/solution/ \
- --as-avrodatafile
- :'#problem 16
- #Get NYSE data in ascending order by date and descending order by volume
- Output Requirements
- #Save data back to HDFS
- #Column order: stockticker, transactiondate, openprice, highprice, lowprice, closeprice, volume
- #File Format: text
- #Delimiter: :
- #Place the sorted NYSE data in the HDFS directory
- #/user/`whoami`/problem16/solution/
- #Replace `whoami` with your OS user name'
- val nyseRDD = sc.textFile("/public/nyse")
- val nyseDF = nyseRDD.map(rec => {
- val t = rec.split(",")
- (t(0).toString,t(1).toInt,t(2).toFloat,t(3).toFloat,t(4).toFloat,t(5).toFloat,t(6).toInt)
- }).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
- val res = nyseDF.orderBy(col("transactiondate"),col("volume").desc)
- res.map(rec => rec.mkString(":")).saveAsTextFile("/user/vikasgonti/itversity/problem16/solution/")
- :'#problem 17
- #Get the stock tickers from NYSE data for which full name is missing in NYSE symbols data
- Output Requirements
- #Get unique stock ticker for which corresponding names are missing in NYSE symbols data
- #Save data back to HDFS
- #File Format: avro
- #Avro dependency details:
- #groupId -> com.databricks, artifactId -> spark-avro_2.10, version -> 2.0.1
- #Place the sorted NYSE data in the HDFS directory
- #/user/`whoami`/problem17/solution/
- #Replace `whoami` with your OS user name'
- val nyseRDD = sc.textFile("/public/nyse")
- val nyseDF = nyseRDD.map(rec => {
- val t = rec.split(",")
- (t(0).toString)
- }).toDF("stockticker")
- nyseDF.registerTempTable("nyse")
- val nysesymRDD = sc.textFile("/public/nyse_symbols")
- val first = nysesymRDD.first
- val symDataDF = nysesymRDD.filter(rec => rec!= first).map(rec => {
- val t = rec.split("\t")
- (t(0).toString)
- }).toDF("stocksymbol")
- symDataDF.registerTempTable("symData")
- val res = sqlContext.sql("select DISTINCT stockticker as Symbol from nyse n "+
- "left outer join symData s on n.stockticker = s.stocksymbol and s.stocksymbol is null")
- import com.databricks.spark.avro._
- res.write.avro("/user/vikasgonti/itversity/problem17/solution/")
- :'#problem 18
- #Get the name of stocks displayed along with other information
- Output Requirements
- #Get all NYSE details along with stock name if exists, if not stockname should be empty
- #Column Order: stockticker, stockname, transactiondate, openprice, highprice, lowprice, closeprice, volume
- #Delimiter: ,
- #File Format: text
- #Place the data in the HDFS directory
- #/user/`whoami`/problem18/solution/
- #Replace `whoami` with your OS user name'
- val nyseRDD = sc.textFile("/public/nyse")
- val nyseDF = nyseRDD.map(rec => {
- val t = rec.split(",")
- (t(0).toString,t(1).toInt,t(2).toFloat,t(3).toFloat,t(4).toFloat,t(5).toFloat,t(6).toInt)
- }).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
- nyseDF.registerTempTable("nyse")
- val nysesymRDD = sc.textFile("/public/nyse_symbols")
- val first = nysesymRDD.first
- val symDataDF = nysesymRDD.filter(rec => rec!= first).map(rec => {
- val t = rec.split("\t")
- (t(0).toString, t(1).toString)
- }).toDF("stocksymbol", "stockname")
- symDataDF.registerTempTable("symData")
- val res = sqlContext.sql("select stockticker, nvl(stockname,'') stockname, transactiondate, openprice, "+
- "highprice, lowprice, closeprice, volume "+
- "from nyse n left outer join symData s on n.stockticker = s.stocksymbol")
- res.map(rec => rec.mkString(",")).saveAsTextFile("/user/vikasgonti/itversity/problem18/solution/")
- :'#problem 19
- #Get number of companies who filed LCAs for each year
- Output Requirements
- #File Format: text
- #Delimiter: tab character "\t"
- #Output Field Order: year, lca_count
- #Place the output files in the HDFS directory
- #/user/`whoami`/problem19/solution/
- #Replace `whoami` with your OS user name'
- val h1bdata = sc.textFile("/public/h1b/h1b_data_noheader")
- val h1bDF = h1bdata.map(rec => {
- val t = rec.split("\0")
- (t(2),t(7))
- }).toDF("EMPLOYER_NAME","YEAR")
- h1bDF.registerTempTable("h1bdata")
- val res = sqlContext.sql("select YEAR, count(EMPLOYER_NAME) as lca_count from h1bdata where year != 'NA' group by year")
- res.map(rec => rec.mkString("\t")).saveAsTextFile("/user/vikasgonti/itversity/problem19/solution/")
- :'#problem 20
- #using sqoop and import data with employer_name, case_status and count.
- #Make sure data is sorted by employer_name in ascending order and by count in descending order
- Output Requirements
- #Place the h1b related data in files in HDFS directory
- #/user/`whoami`/problem20/solution/
- #Replace `whoami` with your OS user name
- #Use text file format and tab (\t) as delimiter
- #Hint: You can use Spark with JDBC or Sqoop import with query
- #You might not get such hints in actual exam
- #Output should contain employer name, case status and count
- '
- sqoop eval \
- --connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
- --username h1b_user \
- --password itversity \
- --query "select count(*) from (select EMPLOYER_NAME, CASE_STATUS, count(1) as count from h1b_data group by EMPLOYER_NAME, CASE_STATUS) A"
- sqoop import \
- -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
- --connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
- --username h1b_user \
- --password itversity \
- --query "select EMPLOYER_NAME, CASE_STATUS, count(1) as count from h1b_data where \$CONDITIONS group by EMPLOYER_NAME, CASE_STATUS order by EMPLOYER_NAME, count desc" \
- --target-dir /user/vikasgonti/itversity/problem20/solution/ \
- --as-textfile \
- --fields-terminated-by '\t' \
- --split-by case_status
Add Comment
Please, Sign In to add comment