Guest User

Untitled

a guest
Dec 16th, 2017
113
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 28.30 KB | None | 0 0
  1. #Loading data from Myql to HDFS
  2.  
  3. #!/bin/bash
  4. sqoop import \
  5. --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
  6. --username=root \
  7. --password=cloudera \
  8. --m 1 \
  9. --table=categories \
  10. --target-dir /cert/mysqltohdfs \
  11.  
  12.  
  13.  
  14. #Export data to a MySQL database from HDFS using Sqoop
  15.  
  16. #!/bin/sh
  17. sqoop export \
  18. --connect jdbc:mysql://localhost:3306/retail_db \
  19. --username=root \
  20. --password=cloudera \
  21. --export-dir /cert/mysqltohdfs/part-m-00000 \
  22. -m 1 \
  23. --table hdfscategories \
  24. --direct \
  25.  
  26.  
  27.  
  28. #Change the delimiter and file format of data during import using Sqoop
  29.  
  30.  
  31. sqoop import \
  32. --connect jdbc:mysql://localhost:3306/retail_db \
  33. --username root \
  34. --password cloudera \
  35. -m 1 \
  36. --table=categories \
  37. --target-dir /cert/changefiledelim \
  38. --as-textfile \
  39. --fields-terminated-by D \
  40. -z \
  41. --compression-codec snappy \
  42.  
  43.  
  44.  
  45. #Ingest real-time and near-real time (NRT) streaming data into HDFS using Flume-----------STILL PENDING(Near Realtime)
  46.  
  47.  
  48. twitter-agent.sources = twitter
  49. twitter-agent.sinks = hdfs-write
  50. twitter-agent.channels= MemChannel
  51.  
  52. twitter-agent.sources.twitter.type = com.cloudera.flume.source.TwitterSource
  53. twitter-agent.sources.twitter.channels = MemChannel
  54. twitter-agent.sources.twitter.consumerKey = kBxhubHNb3sPhq1eSoWSdxf5K
  55. twitter-agent.sources.twitter.consumerSecret= GZE3Lp0WklDmhBCvDSsTSNpf87nM71PxP5jB5g0pZm49g6vXtW
  56. twitter-agent.sources.twitter.accessToken = 295114956-1gjorJnJymApxvqbHEPDWZALxHxPZsNOVSFFV9mo
  57. twitter-agent.sources.twitter.accessTokenSecret = fuLmttz2hn4d7HNOSYnYuN3tkEdV2H94zkdo1hDbnjfX4
  58. twitter-agent.sources.twitter.keywords = trump
  59.  
  60. twitter-agent.sinks.hdfs-write.channel = MemChannel
  61. twitter-agent.sinks.hdfs-write.type = hdfs
  62. twitter-agent.sinks.hdfs-write.hdfs.path = hdfs://localhost:8020/cert/tweets/%Y/%m/%d/%H/
  63. twitter-agent.sinks.hdfs-write.hdfs.rollInterval = 600
  64. twitter-agent.sinks.hdfs-write.hdfs.rollCount = 1000
  65. twitter-agent.sinks.hdfs-write.hdfs.rollSize = 0
  66. twitter-agent.sinks.hdfs-write.hdfs.writeFormat = Text
  67. twitter-agent.sinks.hdfs-write.hdfs.fileType = DataStream
  68. twitter-agent.sinks.hdfs-write.hdfs.batchSize = 1000
  69.  
  70. twitter-agent.channels.MemChannel.type = memory
  71. twitter-agent.channels.MemChannel.capacity=1000
  72. twitter-agent.channels.MemChannel.transactionCapacity = 100
  73.  
  74. flume-ng agent --name twitter-agent --conf-file /home/cloudera/cert/sampleagent.conf -Dflume.root.logger=INFO,console
  75.  
  76.  
  77.  
  78.  
  79. #Load data into and out of HDFS using the Hadoop File System (FS) commands
  80.  
  81. hadoop fs -get /cert/tweets/2016/01/16/22/FlumeData.1453013770252 /home/cloudera/ Copies data from HDFS to the Local FileSystem
  82. hadoop fs -copyToLocal /cert/tweets/2016/01/16/22/flumedata /home/cloudera/ Copies data from HDFS to the Local FileSystem
  83.  
  84. hadoop fs -put /home/cloudera/FlumeData.1453013770252 /cert/tweets/2016/01/16/22/ Copies data from Local File Sytem to HDFS
  85. hadoop fs -copyFromLocal /home/cloudera/fldata /cert/tweets/2016/01/16/22/ Copies data from Local File Sytem to HDFS
  86.  
  87. hadoop fs -moveFromLocal /home/cloudera/flumedata /cert/tweets/2016/01/16/22/ Move data from Local File Sytem to HDFS
  88. hadoop fs -mv /cert/tweets/2016/01/16/22/FlumeData.14530(source) /cert/tweets/2016/01/(Dest) Move data from one HDFS Location to one location
  89. hadoop fs -cp /cert/tweets/2016/01/16/22/fldata /cert/tweets/2016/01/16/ Copies data between HDFS Locations
  90. hadoop fs -getmerge /cert/tweets/2016/01/16/22/ FlumeData.1453013513452 Takes a source directory and destination file and concatenates files in src into destination file.
  91.  
  92.  
  93.  
  94. #Spark loading HDFS data into RDD as Text File
  95.  
  96. val load = sc.textFile("/cert/mysqltohdfsalltables/categories/p*")
  97. load.collect().foreach(println)
  98. # writing back to HDFS----Here load is an RDD
  99. load.saveAsTextFile("/cert/spark/textfile")
  100. load.saveAsObjectFile("/cert/spark/objectfile")
  101.  
  102.  
  103. #Spark Loading/Reading data and writing data as Text File
  104.  
  105. import org.apache.spark.SparkContext
  106. import org.apache.spark.SparkConf
  107.  
  108. object fileconversion {
  109. def main(args: Array[String]) {
  110. val conf = new SparkConf().setAppName("Text Conv")
  111. val sc = new SparkContext(conf)
  112. val textRDD = sc.textFile("/cert/spark/textfile/part-*")
  113. textRDD.saveAsTextFile("/cert/spark/textfileoutput1")
  114. }
  115. }
  116.  
  117. spark-submit --class "fileconversion" --master local /home/cloudera/scala/target/scala-2.10/fileconversionproject_2.10-1.0.jar
  118.  
  119.  
  120. #Reading data as Text file and Writing it as Object File
  121.  
  122. import org.apache.spark.SparkContext
  123. import org.apache.spark.SparkConf
  124.  
  125. object objectfileconversion {
  126. def main(args: Array[String]) {
  127. val conf = new SparkConf().setAppName("Object Conv")
  128. val sc = new SparkContext(conf)
  129. val textRDD = sc.textFile("/cert/spark/textfile/part-*")
  130. textRDD.saveAsObjectFile("/cert/spark/objectfileoutput1")
  131. }
  132. }
  133.  
  134.  
  135. #Reading data as Text file and Writing as Sequencefile(K,V) with NullWritable as key
  136.  
  137. import org.apache.hadoop.io._
  138. import org.apache.spark.SparkConf
  139. import org.apache.spark.SparkContext
  140. import org.apache.spark.rdd.SequenceFileRDDFunctions
  141.  
  142. object sequencefileconversion {
  143. def main(args: Array[String]) {
  144. val conf = new SparkConf().setAppName("Text Conv")
  145. val sc = new SparkContext(conf)
  146. val textRDD = sc.textFile("/cert/spark/textfile/part-*")
  147. val splitRDD = textRDD.flatMap(line => line.split(","))
  148. val seqRDD = splitRDD.map(word => (NullWritable.get(),word))
  149. seqRDD.saveAsSequenceFile("/cert/spark/Sequencefileoutput1")
  150. }
  151.  
  152.  
  153. #Reading data as Text file and Writing as Sequence file with Key
  154.  
  155. import org.apache.hadoop.io._
  156. import org.apache.spark.SparkConf
  157. import org.apache.spark.SparkContext
  158. //import org.apache.spark.rdd.SequenceFileRDDFunctions
  159.  
  160. object sequencefileconversion {
  161. def main(args: Array[String]) {
  162. val conf = new SparkConf().setAppName("Text Conv")
  163. val sc = new SparkContext(conf)
  164. val textRDD = sc.textFile("/cert/mysqltohdfsalltables/departments/part*")
  165. val seqRDD = textRDD.map(word => ( word.split(",")(0), word.split(",")(1) ))
  166. seqRDD.saveAsSequenceFile("/cert/spark/Sequencefileoutputkey")
  167. }
  168. }
  169.  
  170.  
  171. #Reading as Sequence file and Saving as Text file
  172.  
  173. import org.apache.hadoop.io._
  174. import org.apache.spark.SparkConf
  175. import org.apache.spark.SparkContext
  176.  
  177. object readsequencefileconversion {
  178. def main(args: Array[String]) {
  179. val conf = new SparkConf().setAppName("Text Conv")
  180. val sc = new SparkContext(conf)
  181. val textRDD = sc.sequenceFile("/cert/spark/Sequencefileoutputkey" ,classOf[Text], classOf[Text])
  182. val seqRDD = textRDD.map { case(x,y) => (x.toString(),y.toString()) }
  183. seqRDD.saveAsTextFile("/cert/spark/textfileoutputkeiyii")
  184. }
  185. }
  186.  
  187.  
  188. #Joining Two Datasets(Inner Join)
  189.  
  190. import org.apache.spark.SparkConf
  191. import org.apache.spark.SparkContext
  192.  
  193. object joinorders {
  194. def main(args: Array[String]) {
  195. val conf = new SparkConf().setAppName("seqText")
  196. val sc = new SparkContext(conf)
  197. val productRDD = sc.textFile("/cert/mysqltohdfsalltables/products/part*")
  198. val orderRDDext = productRDD.map(s => (s.split(",")(0),s.split(",")(2) ))
  199. val orditemRDD = sc.textFile("/cert/mysqltohdfsalltables/order_items/part*").map(x=> x.split(",")).map(x=>(x(2),x(3)))
  200. val joinD = orderRDDext.join(orditemRDD)
  201. joinD.count
  202. joinD.collect()
  203. joinD.saveAsTextFile("/cert/spark/join_orditem_product")
  204. }
  205. }
  206.  
  207.  
  208. #Joining Two Datasets(Left Join)
  209.  
  210. import org.apache.spark.SparkConf
  211. import org.apache.spark.SparkContext
  212.  
  213. object leftjoinorders {
  214. def main(args: Array[String]) {
  215. val conf = new SparkConf().setAppName("leftjoin")
  216. val sc = new SparkContext(conf)
  217. val productRDD = sc.textFile("/cert/mysqltohdfsalltables/products/part*").map(s => s.split(",")).map(s=>(s(0),s(2)))
  218. val orditemRDD = sc.textFile("/cert/mysqltohdfsalltables/order_items/part*").map(x=> x.split(",")).map(x=>(x(2),x(3)))
  219. val joinD = productRDD.leftOuterJoin(orditemRDD)
  220. joinD.saveAsTextFile("/cert/spark/leftjoin_orditem_product")
  221. }
  222. }
  223.  
  224.  
  225. #Joining Two Datasets(Right Join)--o/p--(906,(Some(Team Golf Tennessee Volunteers Putter Grip),4))
  226.  
  227. import org.apache.spark.SparkConf
  228. import org.apache.spark.SparkContext
  229.  
  230. object rightjoinorders {
  231. def main(args: Array[String]) {
  232. val conf = new SparkConf().setAppName("leftjoin")
  233. val sc = new SparkContext(conf)
  234. val productRDD = sc.textFile("/cert/mysqltohdfsalltables/products/part*").map(s => s.split(",")).map(s=>(s(0),s(2)))
  235. val orditemRDD = sc.textFile("/cert/mysqltohdfsalltables/order_items/part*").map(x=> x.split(",")).map(x=>(x(2),x(3)))
  236. val joinD = productRDD.rightOuterJoin(orditemRDD)
  237. joinD.saveAsTextFile("/cert/spark/rightjoin_orditem_product")
  238. }
  239. }
  240.  
  241.  
  242. #Joining Two Hive tables through hivecontext(Spark)
  243.  
  244. import org.apache.spark.sql.hive.HiveContext
  245. import org.apache.spark.sql.SQLContext
  246. import org.apache.spark.SparkConf
  247. import org.apache.spark.SparkContext
  248.  
  249. object joinhive {
  250. def main(args: Array[String]) {
  251. val conf = new SparkConf().setAppName("join hive")
  252. val sc = new SparkContext(conf)
  253. val sqlContext = new HiveContext(sc)
  254. //val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
  255. val joinData = sqlContext.sql("select c.customer_fname,c.customer_lname,o.order_date,o.order_status from cert.orders o join cert.customers c on c.customer_id = o.order_customer_id")
  256. joinData.collect().foreach(println)
  257. }
  258. }
  259.  
  260.  
  261. #Max, Min and Count of a Specific Column
  262.  
  263. import org.apache.spark.SparkConf
  264. import org.apache.spark.SparkContext
  265. import org.apache.spark.SparkContext._
  266. import java.lang.Math
  267.  
  268. object maxmin {
  269. def main(args:Array[String]) {
  270. val conf = new SparkConf().setAppName("Maxmin")
  271. val sc = new SparkContext(conf)
  272. val loadrdd = sc.textFile("/cert/mysqltohdfs/order_items/part*")
  273. val maprdd = loadrdd.map(x => x.split(",")).map(x=>x(4).toFloat)
  274. val maxrdd = maprdd.max()
  275. val minrdd = maprdd.min()
  276. val countrdd = maprdd.count()
  277. println("Max Orders %s ".format(maxrdd))
  278. println("Miin Orders %s " .format(minrdd))
  279. println("Count is %s ".format(countrdd))
  280. }
  281. }
  282.  
  283.  
  284. #Sum & Avg of a Specific column
  285.  
  286. import org.apache.spark.SparkConf
  287. import org.apache.spark.SparkContext
  288. import org.apache.spark.SparkContext._
  289. import java.lang.Math
  290.  
  291. object sumavg {
  292. def main(args:Array[String]) {
  293. val conf = new SparkConf().setAppName("Maxmin")
  294. val sc = new SparkContext(conf)
  295. val loadrdd = sc.textFile("/cert/mysqltohdfs/order_items/part*")
  296. val maprdd = loadrdd.map(x => x.split(",")).map(x=>x(0).toLong)
  297. val sumrdd = maprdd.reduce((x,y) =>x+y)
  298. //val sum2rdd = maprdd.sum----Returns Exponential
  299. val countrdd = maprdd.count()
  300. val avgrdd = sumrdd/countrdd
  301.  
  302. println("Total sum of Orders %s ".format(sumrdd))
  303. //println("Sum with function %s " .format(sum2rdd))
  304. println("Count is %s ".format(countrdd))
  305. println("Average is %s ".format(avgrdd))
  306. }
  307. }
  308.  
  309.  
  310. #SortByKey and GroupByKey
  311.  
  312. import org.apache.spark.SparkConf
  313. import org.apache.spark.SparkContext
  314. import org.apache.spark.SparkContext._
  315.  
  316. object sortgroup {
  317. def main(args: Array[String]) {
  318. val conf = new SparkConf().setAppName("sortgroup")
  319. val sc = new SparkContext(conf)
  320. val loadrdd = sc.textFile("/cert/mysqltohdfs/order_items/part*")
  321. val maprdd = loadrdd.map(x => x.split(",")).map(x=>(x(2),x(4).toDouble))
  322.  
  323. //groupByKey
  324. //Print count of the result of groupby
  325. val cgrouprdd = maprdd.groupByKey().distinct().count()
  326. //Printing all KV pairs on Screen
  327. val grouprdd = maprdd.groupByKey().foreach(println)
  328. //Printng count
  329. val countrdd = maprdd.count()
  330.  
  331. //sortByKey Default--Ascending
  332. val sortrdd = maprdd.sortByKey(ascending=false,numPartitions=1).foreach(println)
  333. val sortcount = maprdd.sortByKey().count()
  334. println("Distinct Count--Grouping %s ".format(cgrouprdd))
  335. println("Total Count is %s ".format(countrdd))
  336. println("Sorting is %s " .format(sortcount))
  337. }
  338. }
  339.  
  340.  
  341. #Sample Opeartions on Data with SortBy,ReduceBy & Lambda
  342.  
  343. import org.apache.spark.SparkConf
  344. import org.apache.spark.SparkContext
  345. import org.apache.spark.SparkContext._
  346.  
  347. object sampprac {
  348. def main(args: Array[String]) {
  349. val conf = new SparkConf().setAppName("ssoperations")
  350. val sc = new SparkContext(conf)
  351.  
  352. val ordersRDD = sc.textFile("/cert/mysqltohdfs/orders/part*")
  353. val orderItemsRDD = sc.textFile("/cert/mysqltohdfs/order_items/part*")
  354.  
  355.  
  356. //Unique String as a Key and entire Row as Value
  357. //val ordersParsedRDD = ordersRDD.map(x => (x.split(","), x))
  358. //ordersParsedRDD.collect().foreach(println)
  359.  
  360. //Key as one of the Columns and entire Row as Value
  361. val ordersParseRDDkey = ordersRDD.map(x=> (x.split(",")(0), x))
  362. ordersParseRDDkey.collect().take(5)
  363.  
  364. val ordersItemParseRDDkey = orderItemsRDD.map(x=> (x.split(",")(1), x))
  365. ordersItemParseRDDkey.first()
  366.  
  367. val joinonitemid = ordersParseRDDkey.join(ordersItemParseRDDkey)
  368. //joinonitemid.collect().foreach(println)
  369.  
  370. //Lambda--- Enter Value and next Portion of Value(Dataset)
  371. val mappingcolumns = joinonitemid.map(x=> (x._2._1.split(",")(1) , x._2._2.split(",")(4).toFloat))
  372. mappingcolumns.take(10)
  373.  
  374. // Total Sum of Orders per day
  375.  
  376. val reducebydateRDD = mappingcolumns.reduceByKey((x,y) => x+y).sortByKey()
  377. reducebydateRDD.collect().foreach(println)
  378.  
  379. //Total Sales Per day
  380.  
  381. val mappingsales = joinonitemid.map(y=> (y._2._1.split(",")(1), y._2._2.split(",")(3).toInt)).reduceByKey((x,y) => x+y)
  382. mappingsales.collect().foreach(println)
  383. }
  384. }
  385.  
  386.  
  387. #Filter the data
  388.  
  389. import org.apache.spark.SparkConf
  390. import org.apache.spark.SparkContext
  391. import org.apache.spark.SparkContext._
  392.  
  393. object filterapp {
  394. def main(args: Array[String]) {
  395. val conf = new SparkConf().setAppName("Filterops")
  396. val sc = new SparkContext(conf)
  397.  
  398. val loadcust = sc.textFile("/cert/mysqltohdfs/customers/part*").map(x => x.split(",")).map(s => s(7))
  399. loadcust.collect().foreach(println)
  400. loadcust.count()
  401.  
  402. //Filter data
  403. val filterdate = loadcust.filter(x=>x.contains("TX")).count()
  404. val filterdateks = loadcust.filter(x=>x.contains("KS")).count()
  405. println("Customers in Texas %s" .format(filterdate))
  406. println("Customers in Kansas %s" .format(filterdateks))
  407. }
  408. }
  409.  
  410.  
  411. #Filter data into a Smaller Dataset
  412.  
  413. import org.apache.spark.SparkConf
  414. import org.apache.spark.SparkContext
  415. import org.apache.spark.SparkContext._
  416.  
  417. object datafilterapp {
  418. def main(args: Array[String]) {
  419. val conf = new SparkConf().setAppName("dFilterops")
  420. val sc = new SparkContext(conf)
  421.  
  422. val loadcust = sc.textFile("/cert/mysqltohdfs/customers/part*").cache()
  423. //loadcust.collect().foreach(println)
  424. loadcust.count()
  425.  
  426. //Filter data
  427. // Customers in MN State and Texas State (|| -or operator && -And Opeartor)
  428. val filterdate = loadcust.filter(x=> x.split(",")(7).equals("MN") || x.split(",")(7).toString.contains("TX") )
  429. filterdate.collect().foreach(println)
  430.  
  431. //Customer Ids greater than 500
  432. val cust = loadcust.filter(x =>x.split(",")(0).toInt > 500).take(10).foreach(println)
  433.  
  434. }
  435. }
  436.  
  437.  
  438. #Sorting with Filter and join Operation
  439.  
  440. import org.apache.spark.SparkConf
  441. import org.apache.spark.SparkContext
  442. import org.apache.spark.SparkContext._
  443.  
  444. object sorted {
  445. def main(args: Array[String]) {
  446. val conf = new SparkConf().setAppName("sortgroup")
  447. val sc = new SparkContext(conf)
  448.  
  449. //Load data
  450. val loadrdd = sc.textFile("/cert/mysqltohdfs/orders/part*")
  451. val maprdd = loadrdd.map( x=> (x.split(",")(0), x))
  452.  
  453. val loaditem =sc.textFile("/cert/mysqltohdfs/order_items/part*")
  454. val mapitem = loaditem.map( x=> (x.split(",")(1), x))
  455.  
  456. //Join Data
  457. val joinrdd = maprdd.join(mapitem)
  458. joinrdd.take(5).foreach(println)
  459.  
  460. //Get the columns needed --------Tuple ==>Element
  461. val readrdd = joinrdd.map( x => (x._2._1.split(",")(3) , x._2._2.split(",")(4)))
  462. readrdd.take(10).foreach(println)
  463.  
  464. //Filter data - apply operations--------For Tuple x =>x._1
  465. val resultrdd = readrdd.filter( x => x._1.equals("CLOSED") && x._2.toFloat >300 )
  466. resultrdd.collect().foreach(println)
  467.  
  468. //Sort the Data By Value in Tuple
  469. val result1rdd = resultrdd.sortBy(_._2)
  470. result1rdd.collect().foreach(println)
  471.  
  472. }
  473. }
  474.  
  475.  
  476.  
  477. # Convert an RDD into a Dataframe and Query the Dataset
  478.  
  479. import org.apache.spark.SparkContext
  480. import org.apache.spark.SparkConf
  481. // Import the DataFrame functions API
  482. // import org.apache.spark.sql.functions._
  483.  
  484.  
  485. case class orders(order_id:Int,order_date:String,order_customer_id:Int,order_Status:String)
  486.  
  487. object dfquery {
  488. def main(args: Array[String]) {
  489. val conf = new SparkConf().setAppName("DiDF")
  490. val sc = new SparkContext(conf)
  491.  
  492. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  493. // this is used to implicitly convert an RDD to a DataFrame.
  494. import sqlContext.implicits._
  495.  
  496. val orderRDD = sc.textFile("/cert/mysqltohdfs/orders").map(x => x.split(","))
  497. orderRDD.collect().take(5)
  498.  
  499. val mappingRDD = orderRDD.map( x => orders(x(0).toInt,x(1),x(2).toInt,x(3)))
  500. val ordersDF = mappingRDD.toDF()
  501.  
  502. ordersDF.registerTempTable("ordersDF")
  503.  
  504. val Querydata = sqlContext.sql("select * from ordersDF where order_Status LIKE '%CLOSED%' ")
  505. Querydata.collect()foreach(println)
  506.  
  507. }
  508. }
  509.  
  510.  
  511. #SBT file with Sparksql
  512. name := "dfquery"
  513. version := "1.0"
  514. scalaVersion := "2.10.4"
  515. libraryDependencies ++= Seq(
  516. "org.apache.spark" %% "spark-sql" % "1.3.0",
  517. "org.apache.spark" %% "spark-core" % "1.3.0"
  518. )
  519.  
  520.  
  521.  
  522. #### Python--Pyspark ####
  523.  
  524. ######Load data from HDFS and Print
  525.  
  526. from pyspark import SparkContext, SparkConf
  527.  
  528. conf = SparkConf().setAppName("loaddate").setMaster("local")
  529. sc = SparkContext(conf = conf)
  530. distFile = sc.textFile("/cert/mysqltohdfs/customers/part*")
  531. for line in distFile.collect():
  532. print(line)
  533.  
  534. #Submitting Spark Application
  535. spark-submit --master local savetext.py
  536. pyspark file.py
  537.  
  538.  
  539. ######Load data from HDFS and Store data as Text into HDFS
  540.  
  541.  
  542. from pyspark import SparkConf, SparkContext
  543. conf = SparkConf().setAppName("save").setMaster("local")
  544. sc = SparkContext(conf=conf)
  545. loadFile = sc.textFile("/cert/mysqltohdfs/customers/")
  546. savefile = loadFile.saveAsTextFile("/cert/pyspark/textcustomers")
  547. for line in loadFile.collect():
  548. print line
  549.  
  550.  
  551. ########Load data from HDFS and Store data as Sequence File into HDFS
  552.  
  553.  
  554. from pyspark import SparkConf,SparkContext
  555. conf = SparkConf().setAppName("saveseq").setMaster("local")
  556. sc = SparkContext(conf=conf)
  557.  
  558. loadFile = sc.textFile("/cert/mysqltohdfs/customers/part*")
  559.  
  560. #Null as the Key
  561. loadFile.map(lambda x: (None,x)).saveAsSequenceFile("/cert/pyspark/seqcustomersNone")
  562.  
  563. #Give a Unique Row number as Key
  564. loadFile.map(lambda x: tuple(x.split(",", 1))).saveAsSequenceFile("/cert/pyspark/seqcustomerskey")
  565.  
  566. #converted into Byte Stream
  567. loadFile.saveAsPickleFile("/cert/pyspark/seqcustomersPckle")
  568.  
  569.  
  570. ####Reading a Sequence File
  571.  
  572. from pyspark import SparkConf,SparkContext
  573. conf = SparkConf().setAppName("readSeq")
  574. sc = SparkContext(conf=conf)
  575. readseqFileRDD = sc.sequenceFile("/cert/pyspark/seqcustomerskey/part*")
  576. for line in readseqFileRDD.collect():
  577. print line
  578.  
  579.  
  580. #####Join Operation
  581.  
  582. from pyspark import SparkContext, SparkConf
  583. conf = SparkConf().setAppName("join").setMaster("local")
  584. sc = SparkContext(conf=conf)
  585.  
  586. #Will not get u' in the ouptut(u' is a String Literal ,used to denote that object is unicode object.
  587. # It decodes the characters according to the source encoding declaration.)
  588. #--without u'--it is a byte String(without encoding)
  589. ordRDD = sc.textFile("/cert/mysqltohdfs/orders/part*",use_unicode=False)
  590. orditemsRDD = sc.textFile("/cert/mysqltohdfs/order_items/part*",use_unicode=False)
  591.  
  592. orders = ordRDD.map(lambda x: x.split(",")).map(lambda x:(x[0],x))
  593. items = orditemsRDD.map(lambda x:x.split(",")).map(lambda x:(x[1],x))
  594. joinRDD = orders.join(items)
  595.  
  596. ordersd = ordRDD.map(lambda x: (x.split(",")[0],x))
  597. itemsd = orditemsRDD.map( lambda x: (x.split(",")[1],x))
  598. joinRDD = ordersd.join(itemsd)
  599.  
  600. for line in joinRDD.collect():
  601. print line
  602.  
  603.  
  604. #####Left Join Operation
  605.  
  606.  
  607. from pyspark import SparkContext, SparkConf
  608. conf = SparkConf().setAppName("join").setMaster("local")
  609. sc = SparkContext(conf=conf)
  610.  
  611. ordRDD = sc.textFile("/cert/mysqltohdfs/orders/part*",use_unicode=False)
  612. custRDD = sc.textFile("/cert/mysqltohdfs/customers/part*",use_unicode=False)
  613.  
  614. orders = ordRDD.map(lambda x: x.split(",")).map(lambda x:(x[2],x))
  615. items = custRDD.map(lambda x:x.split(",")).map(lambda x:(x[0],(x[1],x[2])))
  616. joinRDD = orders.leftOuterJoin(items)
  617.  
  618. for line in joinRDD.collect():
  619. print line
  620. #count
  621. cnt = joinRDD.count()
  622. print cnt
  623.  
  624.  
  625. #####Max,Min,Count
  626.  
  627. from pyspark import SparkContext,SparkConf
  628. conf = SparkConf().setAppName("Maxmin").setMaster("local")
  629. sc = SparkContext(conf=conf)
  630.  
  631. ordRDD = sc.textFile("/cert/mysqltohdfs/orders/part*")
  632.  
  633. #map Input Variables
  634. order_id = 0
  635. order_date = 1
  636. order_customer_is = 2
  637. order_status = 3
  638.  
  639. splitRDD = ordRDD.map(lambda x:x.split(","))
  640. #Count
  641. totalitems = splitRDD.map(lambda x:x[order_id]).distinct().count()
  642. print totalitems
  643. #Max Order Date
  644. Maxorddate = splitRDD.map(lambda x:x[order_date]).reduce(max)
  645. print Maxorddate
  646. #Min Order Data
  647. #----Can use with reduce also---Minorddate = splitRDD.map(lambda x:x[order_date]).reduce(min)
  648. Minorddate = splitRDD.map(lambda x:x[order_date]).min()
  649. print Minorddate
  650.  
  651.  
  652. ######Count, Average & Sum
  653.  
  654. from pyspark import SparkConf,SparkContext
  655. conf = SparkConf().setAppName("Avg").setMaster("local")
  656. sc = SparkContext(conf=conf)
  657.  
  658. ordirdd = sc.textFile("/cert/mysqltohdfs/order_items/part*")
  659. splitrdd = ordirdd.map(lambda x:x.split(","))
  660. for line in ordirdd.take(8):
  661. print line
  662.  
  663. #Count
  664. cnt = splitrdd.map(lambda x:int(x[0])).count()
  665. print cnt
  666. #Sum
  667. total = splitrdd.map(lambda x:float(x[4]))
  668. for line in total.take(8):
  669. print line
  670. Tot = total.sum()
  671. print Tot
  672. #Avg
  673. avg = float(Tot/cnt)
  674. print avg
  675.  
  676.  
  677. ####Filter operation
  678.  
  679. from pyspark import SparkConf,SparkContext
  680. conf = SparkConf().setAppName("filter").setMaster("local")
  681. sc = SparkContext(conf=conf)
  682.  
  683. ordrdd = sc.textFile("/cert/mysqltohdfs/orders/part*",use_unicode=False)
  684. for line in ordrdd.take(5):
  685. print line
  686. splitrdd = ordrdd.map(lambda x: x.split(","))
  687. #Filter-Equal
  688. filterrdd = splitrdd.filter(lambda x: x[3] == "COMPLETE")
  689. for line in filterrdd.collect():
  690. print line
  691. #Equal
  692. for line in splitrdd.filter(lambda x: x[1] == "2014-07-21 00:00:00.0").collect():
  693. print line
  694. #Greater
  695. greaterrdd = splitrdd.filter(lambda x: x[1] > '2014-07-21 00:00:00')
  696. for line in greaterrdd.collect():
  697. print line
  698.  
  699. #####Sorting operation
  700.  
  701. from pyspark import SparkContext,SparkConf
  702. conf = SparkConf().setAppName("Sort").setMaster("local")
  703. sc = SparkContext(conf=conf)
  704.  
  705. ordRDD = sc.textFile("/cert/mysqltohdfs/orders/part*")
  706. for line in ordRDD.take(5):
  707. print line
  708.  
  709. splitRDD = ordRDD.map( lambda x:x.split(","))
  710.  
  711. reduceRDD = splitRDD.map( lambda x:(x[1],1))
  712. for line in reduceRDD.take(5):
  713. print line
  714.  
  715. #Count Number of Orders per Date
  716. countRDD = reduceRDD.reduceByKey(lambda x,y : x+y)
  717. for line in countRDD.take(7):
  718. print line
  719.  
  720. newrdd = ordRDD.map(lambda x : ((x.split(",")[1]),x))
  721. for line in newrdd.take(5):
  722. print line
  723.  
  724. sortRDD = countRDD.sortByKey()
  725. for line in sortRDD.take(5):
  726. print line
  727.  
  728. #Aggregate Key
  729. newmaprdd = splitRDD.map( lambda x: (str(x[3]),str(x[1])))
  730. for line in newmaprdd.take(5):
  731. print line
  732.  
  733. aggRDD = newmaprdd.aggregateByKey(0, lambda x,y : x+1 , lambda x,y : x+y )
  734. for line in aggRDD.collect():
  735. print line
  736.  
  737.  
  738. #####Convert an RDD into a Dataframe and Query the Dataset
  739.  
  740. from pyspark import SparkContext
  741. #from pyspark import SparkConf
  742. from pyspark.sql import SQLContext,Row
  743.  
  744. sc = SparkContext()
  745. sqlContext = SQLContext(sc)
  746.  
  747. ordRDD = sc.textFile("/cert/mysqltohdfs/orders/p*")
  748. splitRDD = ordRDD.map(lambda x: x.split(","))
  749. #convert each line into a Row
  750. orders = splitRDD.map(lambda x: Row(order_id=x[0],order_date=x[1],order_customer_id=x[2],order_status=x[3]))
  751.  
  752. # Infer the schema, and register the DataFrame as a table.
  753. schemadf = sqlContext.inferSchema(orders)
  754. schemadf.registerTempTable("orders")
  755.  
  756. dfframe = sqlContext.sql("Select * from orders where order_status='COMPLETE'")
  757.  
  758. for line in dfframe.collect():
  759. print line
  760.  
  761.  
  762. #### Rank Query in Spark ##
  763.  
  764. from pyspark import SparkContext
  765. from pyspark.sql import SQLContext,Row
  766. from pyspark.sql import HiveContext
  767. sc = SparkContext()
  768. sqlContext = HiveContext(sc)
  769.  
  770. productRDD = sc.textFile("/cert/mysqltohdfs/products/p*")
  771. splitRDD = productRDD.map(lambda x:x.split(","))
  772.  
  773. products = splitRDD.map(lambda x: Row(product_id=x[0],product_category_id=x[1],product_name=x[2],product_description=x[3],product_price = x[4],product_image=x[5]))
  774.  
  775. # Infer the schema, and register the DataFrame as a table
  776. schemadff = sqlContext.inferSchema(products)
  777. schemadff.registerTempTable("products")
  778.  
  779. Query = sqlContext.sql("select *, rank() over (partition by product_category_id order by product_price desc) from products")
  780.  
  781. for line in Query.collect():
  782. print line
  783.  
  784.  
  785. ### #Importing a MYSQL Table as a Avro datafile to HDFS
  786.  
  787. sqoop import \
  788. --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
  789. --username=root \
  790. --password=cloudera \
  791. --m 1 \
  792. --table=categories \
  793. --as-avrodatafile \
  794. --target-dir /cert/mysqltohdfs/avro/categories \
  795.  
  796. ####Creating a Hive Table on Avro Data File with Schema
  797.  
  798. create EXTERNAL table cert.order_items
  799. (
  800.  
  801. order_item_id int,
  802. order_item_order_id int,
  803. order_item_product_id int,
  804. order_item_quantity int,
  805. order_item_subtotal float,
  806. order_item_product_price float
  807. )
  808. STORED AS AVRO
  809. LOCATION '/cert/mysqltohdfs/avro/order_items';
  810.  
  811.  
  812. ###Creating a Hive Table on Avro Data File with AVRO Schema
  813.  
  814.  
  815. create EXTERNAL table cert.products
  816. ROW FORMAT SERDE
  817. 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
  818. STORED AS INPUTFORMAT
  819. 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
  820. OUTPUTFORMAT
  821. 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
  822. LOCATION '/cert/mysqltohdfs/avro/products'
  823. tblproperties('avro.schema.literal'='{
  824. "type" : "record",
  825. "name" : "sqoop_import_products",
  826. "doc" : "Sqoop import of products",
  827. "fields" : [ {
  828. "name" : "product_id",
  829. "type" : [ "int", "null" ],
  830. "columnName" : "product_id",
  831. "sqlType" : "4"
  832. }, {
  833. "name" : "product_category_id",
  834. "type" : [ "int", "null" ],
  835. "columnName" : "product_category_id",
  836. "sqlType" : "4"
  837. }, {
  838. "name" : "product_name",
  839. "type" : [ "string", "null" ],
  840. "columnName" : "product_name",
  841. "sqlType" : "12"
  842. }, {
  843. "name" : "product_description",
  844. "type" : [ "string", "null" ],
  845. "columnName" : "product_description",
  846. "sqlType" : "12"
  847. }, {
  848. "name" : "product_price",
  849. "type" : [ "float", "null" ],
  850. "columnName" : "product_price",
  851. "sqlType" : "7"
  852. }, {
  853. "name" : "product_image",
  854. "type" : [ "string", "null" ],
  855. "columnName" : "product_image",
  856. "sqlType" : "12"
  857. } ],
  858. "tableName" : "products"
  859. }
  860. ');
  861.  
  862.  
  863. #####Create a table on Avro Data File with Avro Schema File
  864.  
  865. create EXTERNAL table cert.avrocategories
  866. (
  867. category_id int,
  868. category_dept_id int,
  869. category_name string
  870. )
  871. STORED AS AVRO
  872. LOCATION '/cert/mysqltohdfs/avro/categories'
  873. TBLPROPERTIES('avro.schema.url'='hdfs://localhost/cert/mysqltohdfs/avro/sqoop_import_categories.avsc');
  874.  
  875. ####To get Schema from Avro Data File
  876.  
  877. avro-tools getschema part-m-00000.avro >new.txt
  878.  
  879.  
  880. ###Create an Partition Table(Dynamic) and imporove Query Performance
  881.  
  882. CREATE EXTERNAL TABLE ORDERS_PART
  883. (
  884. order_id int,
  885. order_date TIMESTAMP,
  886. order_customer_id int,
  887. order_status varchar(45)
  888. )
  889. PARTITIONED BY(year string,month string)
  890. LOCATION '/cert/mysqltohdfs/orders/';
  891.  
  892. ###Select Records from Main table and insert into Parition Table
  893. insert overwrite table orders_part partition(year,month) select o.*,year(o.order_date),month(o.order_date) from orders o ;
  894.  
  895. ###Creating an Avro Schema File
  896.  
  897. {"namespace": "sqoop_import_avrosample.avsc",
  898. "type": "record",
  899. "name": "User",
  900. "fields": [
  901. {"name": "name", "type": "string"},
  902. {"name": "number", "type": ["int", "null"]},
  903. {"name": "color", "type": ["string", "null"]}
  904. ]
  905. }
  906.  
  907. CREATE EXTERNAL TABLE AVROSAMPLE
  908. STORED AS AVRO
  909. LOCATION '/cert/mysqltohdfs/categories/'
  910. tblproperties('avro.schema.url'='hdfs://localhost/cert/mysqltohdfs/avro/sqoop_import_avrosample.avsc')
  911.  
  912.  
  913. #### IMPALA ####
  914. Same as Hive Command
  915. Refresh tables to detect any metadata changes
  916. INVALIDATE METADATA waits to reload the metadata when needed for a subsequent query, but reloads all the metadata for the table, which can be an expensive operation, especially for large tables with many partitions. REFRESH reloads the metadata immediately, but only loads the block location data for newly added data files, making it a less expensive operation overall.
Add Comment
Please, Sign In to add comment