Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #Loading data from Myql to HDFS
- #!/bin/bash
- sqoop import \
- --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
- --username=root \
- --password=cloudera \
- --m 1 \
- --table=categories \
- --target-dir /cert/mysqltohdfs \
- #Export data to a MySQL database from HDFS using Sqoop
- #!/bin/sh
- sqoop export \
- --connect jdbc:mysql://localhost:3306/retail_db \
- --username=root \
- --password=cloudera \
- --export-dir /cert/mysqltohdfs/part-m-00000 \
- -m 1 \
- --table hdfscategories \
- --direct \
- #Change the delimiter and file format of data during import using Sqoop
- sqoop import \
- --connect jdbc:mysql://localhost:3306/retail_db \
- --username root \
- --password cloudera \
- -m 1 \
- --table=categories \
- --target-dir /cert/changefiledelim \
- --as-textfile \
- --fields-terminated-by D \
- -z \
- --compression-codec snappy \
- #Ingest real-time and near-real time (NRT) streaming data into HDFS using Flume-----------STILL PENDING(Near Realtime)
- twitter-agent.sources = twitter
- twitter-agent.sinks = hdfs-write
- twitter-agent.channels= MemChannel
- twitter-agent.sources.twitter.type = com.cloudera.flume.source.TwitterSource
- twitter-agent.sources.twitter.channels = MemChannel
- twitter-agent.sources.twitter.consumerKey = kBxhubHNb3sPhq1eSoWSdxf5K
- twitter-agent.sources.twitter.consumerSecret= GZE3Lp0WklDmhBCvDSsTSNpf87nM71PxP5jB5g0pZm49g6vXtW
- twitter-agent.sources.twitter.accessToken = 295114956-1gjorJnJymApxvqbHEPDWZALxHxPZsNOVSFFV9mo
- twitter-agent.sources.twitter.accessTokenSecret = fuLmttz2hn4d7HNOSYnYuN3tkEdV2H94zkdo1hDbnjfX4
- twitter-agent.sources.twitter.keywords = trump
- twitter-agent.sinks.hdfs-write.channel = MemChannel
- twitter-agent.sinks.hdfs-write.type = hdfs
- twitter-agent.sinks.hdfs-write.hdfs.path = hdfs://localhost:8020/cert/tweets/%Y/%m/%d/%H/
- twitter-agent.sinks.hdfs-write.hdfs.rollInterval = 600
- twitter-agent.sinks.hdfs-write.hdfs.rollCount = 1000
- twitter-agent.sinks.hdfs-write.hdfs.rollSize = 0
- twitter-agent.sinks.hdfs-write.hdfs.writeFormat = Text
- twitter-agent.sinks.hdfs-write.hdfs.fileType = DataStream
- twitter-agent.sinks.hdfs-write.hdfs.batchSize = 1000
- twitter-agent.channels.MemChannel.type = memory
- twitter-agent.channels.MemChannel.capacity=1000
- twitter-agent.channels.MemChannel.transactionCapacity = 100
- flume-ng agent --name twitter-agent --conf-file /home/cloudera/cert/sampleagent.conf -Dflume.root.logger=INFO,console
- #Load data into and out of HDFS using the Hadoop File System (FS) commands
- hadoop fs -get /cert/tweets/2016/01/16/22/FlumeData.1453013770252 /home/cloudera/ Copies data from HDFS to the Local FileSystem
- hadoop fs -copyToLocal /cert/tweets/2016/01/16/22/flumedata /home/cloudera/ Copies data from HDFS to the Local FileSystem
- hadoop fs -put /home/cloudera/FlumeData.1453013770252 /cert/tweets/2016/01/16/22/ Copies data from Local File Sytem to HDFS
- hadoop fs -copyFromLocal /home/cloudera/fldata /cert/tweets/2016/01/16/22/ Copies data from Local File Sytem to HDFS
- hadoop fs -moveFromLocal /home/cloudera/flumedata /cert/tweets/2016/01/16/22/ Move data from Local File Sytem to HDFS
- hadoop fs -mv /cert/tweets/2016/01/16/22/FlumeData.14530(source) /cert/tweets/2016/01/(Dest) Move data from one HDFS Location to one location
- hadoop fs -cp /cert/tweets/2016/01/16/22/fldata /cert/tweets/2016/01/16/ Copies data between HDFS Locations
- hadoop fs -getmerge /cert/tweets/2016/01/16/22/ FlumeData.1453013513452 Takes a source directory and destination file and concatenates files in src into destination file.
- #Spark loading HDFS data into RDD as Text File
- val load = sc.textFile("/cert/mysqltohdfsalltables/categories/p*")
- load.collect().foreach(println)
- # writing back to HDFS----Here load is an RDD
- load.saveAsTextFile("/cert/spark/textfile")
- load.saveAsObjectFile("/cert/spark/objectfile")
- #Spark Loading/Reading data and writing data as Text File
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkConf
- object fileconversion {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("Text Conv")
- val sc = new SparkContext(conf)
- val textRDD = sc.textFile("/cert/spark/textfile/part-*")
- textRDD.saveAsTextFile("/cert/spark/textfileoutput1")
- }
- }
- spark-submit --class "fileconversion" --master local /home/cloudera/scala/target/scala-2.10/fileconversionproject_2.10-1.0.jar
- #Reading data as Text file and Writing it as Object File
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkConf
- object objectfileconversion {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("Object Conv")
- val sc = new SparkContext(conf)
- val textRDD = sc.textFile("/cert/spark/textfile/part-*")
- textRDD.saveAsObjectFile("/cert/spark/objectfileoutput1")
- }
- }
- #Reading data as Text file and Writing as Sequencefile(K,V) with NullWritable as key
- import org.apache.hadoop.io._
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.rdd.SequenceFileRDDFunctions
- object sequencefileconversion {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("Text Conv")
- val sc = new SparkContext(conf)
- val textRDD = sc.textFile("/cert/spark/textfile/part-*")
- val splitRDD = textRDD.flatMap(line => line.split(","))
- val seqRDD = splitRDD.map(word => (NullWritable.get(),word))
- seqRDD.saveAsSequenceFile("/cert/spark/Sequencefileoutput1")
- }
- #Reading data as Text file and Writing as Sequence file with Key
- import org.apache.hadoop.io._
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- //import org.apache.spark.rdd.SequenceFileRDDFunctions
- object sequencefileconversion {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("Text Conv")
- val sc = new SparkContext(conf)
- val textRDD = sc.textFile("/cert/mysqltohdfsalltables/departments/part*")
- val seqRDD = textRDD.map(word => ( word.split(",")(0), word.split(",")(1) ))
- seqRDD.saveAsSequenceFile("/cert/spark/Sequencefileoutputkey")
- }
- }
- #Reading as Sequence file and Saving as Text file
- import org.apache.hadoop.io._
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- object readsequencefileconversion {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("Text Conv")
- val sc = new SparkContext(conf)
- val textRDD = sc.sequenceFile("/cert/spark/Sequencefileoutputkey" ,classOf[Text], classOf[Text])
- val seqRDD = textRDD.map { case(x,y) => (x.toString(),y.toString()) }
- seqRDD.saveAsTextFile("/cert/spark/textfileoutputkeiyii")
- }
- }
- #Joining Two Datasets(Inner Join)
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- object joinorders {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("seqText")
- val sc = new SparkContext(conf)
- val productRDD = sc.textFile("/cert/mysqltohdfsalltables/products/part*")
- val orderRDDext = productRDD.map(s => (s.split(",")(0),s.split(",")(2) ))
- val orditemRDD = sc.textFile("/cert/mysqltohdfsalltables/order_items/part*").map(x=> x.split(",")).map(x=>(x(2),x(3)))
- val joinD = orderRDDext.join(orditemRDD)
- joinD.count
- joinD.collect()
- joinD.saveAsTextFile("/cert/spark/join_orditem_product")
- }
- }
- #Joining Two Datasets(Left Join)
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- object leftjoinorders {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("leftjoin")
- val sc = new SparkContext(conf)
- val productRDD = sc.textFile("/cert/mysqltohdfsalltables/products/part*").map(s => s.split(",")).map(s=>(s(0),s(2)))
- val orditemRDD = sc.textFile("/cert/mysqltohdfsalltables/order_items/part*").map(x=> x.split(",")).map(x=>(x(2),x(3)))
- val joinD = productRDD.leftOuterJoin(orditemRDD)
- joinD.saveAsTextFile("/cert/spark/leftjoin_orditem_product")
- }
- }
- #Joining Two Datasets(Right Join)--o/p--(906,(Some(Team Golf Tennessee Volunteers Putter Grip),4))
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- object rightjoinorders {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("leftjoin")
- val sc = new SparkContext(conf)
- val productRDD = sc.textFile("/cert/mysqltohdfsalltables/products/part*").map(s => s.split(",")).map(s=>(s(0),s(2)))
- val orditemRDD = sc.textFile("/cert/mysqltohdfsalltables/order_items/part*").map(x=> x.split(",")).map(x=>(x(2),x(3)))
- val joinD = productRDD.rightOuterJoin(orditemRDD)
- joinD.saveAsTextFile("/cert/spark/rightjoin_orditem_product")
- }
- }
- #Joining Two Hive tables through hivecontext(Spark)
- import org.apache.spark.sql.hive.HiveContext
- import org.apache.spark.sql.SQLContext
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- object joinhive {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("join hive")
- val sc = new SparkContext(conf)
- val sqlContext = new HiveContext(sc)
- //val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
- val joinData = sqlContext.sql("select c.customer_fname,c.customer_lname,o.order_date,o.order_status from cert.orders o join cert.customers c on c.customer_id = o.order_customer_id")
- joinData.collect().foreach(println)
- }
- }
- #Max, Min and Count of a Specific Column
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import java.lang.Math
- object maxmin {
- def main(args:Array[String]) {
- val conf = new SparkConf().setAppName("Maxmin")
- val sc = new SparkContext(conf)
- val loadrdd = sc.textFile("/cert/mysqltohdfs/order_items/part*")
- val maprdd = loadrdd.map(x => x.split(",")).map(x=>x(4).toFloat)
- val maxrdd = maprdd.max()
- val minrdd = maprdd.min()
- val countrdd = maprdd.count()
- println("Max Orders %s ".format(maxrdd))
- println("Miin Orders %s " .format(minrdd))
- println("Count is %s ".format(countrdd))
- }
- }
- #Sum & Avg of a Specific column
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import java.lang.Math
- object sumavg {
- def main(args:Array[String]) {
- val conf = new SparkConf().setAppName("Maxmin")
- val sc = new SparkContext(conf)
- val loadrdd = sc.textFile("/cert/mysqltohdfs/order_items/part*")
- val maprdd = loadrdd.map(x => x.split(",")).map(x=>x(0).toLong)
- val sumrdd = maprdd.reduce((x,y) =>x+y)
- //val sum2rdd = maprdd.sum----Returns Exponential
- val countrdd = maprdd.count()
- val avgrdd = sumrdd/countrdd
- println("Total sum of Orders %s ".format(sumrdd))
- //println("Sum with function %s " .format(sum2rdd))
- println("Count is %s ".format(countrdd))
- println("Average is %s ".format(avgrdd))
- }
- }
- #SortByKey and GroupByKey
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- object sortgroup {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("sortgroup")
- val sc = new SparkContext(conf)
- val loadrdd = sc.textFile("/cert/mysqltohdfs/order_items/part*")
- val maprdd = loadrdd.map(x => x.split(",")).map(x=>(x(2),x(4).toDouble))
- //groupByKey
- //Print count of the result of groupby
- val cgrouprdd = maprdd.groupByKey().distinct().count()
- //Printing all KV pairs on Screen
- val grouprdd = maprdd.groupByKey().foreach(println)
- //Printng count
- val countrdd = maprdd.count()
- //sortByKey Default--Ascending
- val sortrdd = maprdd.sortByKey(ascending=false,numPartitions=1).foreach(println)
- val sortcount = maprdd.sortByKey().count()
- println("Distinct Count--Grouping %s ".format(cgrouprdd))
- println("Total Count is %s ".format(countrdd))
- println("Sorting is %s " .format(sortcount))
- }
- }
- #Sample Opeartions on Data with SortBy,ReduceBy & Lambda
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- object sampprac {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("ssoperations")
- val sc = new SparkContext(conf)
- val ordersRDD = sc.textFile("/cert/mysqltohdfs/orders/part*")
- val orderItemsRDD = sc.textFile("/cert/mysqltohdfs/order_items/part*")
- //Unique String as a Key and entire Row as Value
- //val ordersParsedRDD = ordersRDD.map(x => (x.split(","), x))
- //ordersParsedRDD.collect().foreach(println)
- //Key as one of the Columns and entire Row as Value
- val ordersParseRDDkey = ordersRDD.map(x=> (x.split(",")(0), x))
- ordersParseRDDkey.collect().take(5)
- val ordersItemParseRDDkey = orderItemsRDD.map(x=> (x.split(",")(1), x))
- ordersItemParseRDDkey.first()
- val joinonitemid = ordersParseRDDkey.join(ordersItemParseRDDkey)
- //joinonitemid.collect().foreach(println)
- //Lambda--- Enter Value and next Portion of Value(Dataset)
- val mappingcolumns = joinonitemid.map(x=> (x._2._1.split(",")(1) , x._2._2.split(",")(4).toFloat))
- mappingcolumns.take(10)
- // Total Sum of Orders per day
- val reducebydateRDD = mappingcolumns.reduceByKey((x,y) => x+y).sortByKey()
- reducebydateRDD.collect().foreach(println)
- //Total Sales Per day
- val mappingsales = joinonitemid.map(y=> (y._2._1.split(",")(1), y._2._2.split(",")(3).toInt)).reduceByKey((x,y) => x+y)
- mappingsales.collect().foreach(println)
- }
- }
- #Filter the data
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- object filterapp {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("Filterops")
- val sc = new SparkContext(conf)
- val loadcust = sc.textFile("/cert/mysqltohdfs/customers/part*").map(x => x.split(",")).map(s => s(7))
- loadcust.collect().foreach(println)
- loadcust.count()
- //Filter data
- val filterdate = loadcust.filter(x=>x.contains("TX")).count()
- val filterdateks = loadcust.filter(x=>x.contains("KS")).count()
- println("Customers in Texas %s" .format(filterdate))
- println("Customers in Kansas %s" .format(filterdateks))
- }
- }
- #Filter data into a Smaller Dataset
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- object datafilterapp {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("dFilterops")
- val sc = new SparkContext(conf)
- val loadcust = sc.textFile("/cert/mysqltohdfs/customers/part*").cache()
- //loadcust.collect().foreach(println)
- loadcust.count()
- //Filter data
- // Customers in MN State and Texas State (|| -or operator && -And Opeartor)
- val filterdate = loadcust.filter(x=> x.split(",")(7).equals("MN") || x.split(",")(7).toString.contains("TX") )
- filterdate.collect().foreach(println)
- //Customer Ids greater than 500
- val cust = loadcust.filter(x =>x.split(",")(0).toInt > 500).take(10).foreach(println)
- }
- }
- #Sorting with Filter and join Operation
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- object sorted {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("sortgroup")
- val sc = new SparkContext(conf)
- //Load data
- val loadrdd = sc.textFile("/cert/mysqltohdfs/orders/part*")
- val maprdd = loadrdd.map( x=> (x.split(",")(0), x))
- val loaditem =sc.textFile("/cert/mysqltohdfs/order_items/part*")
- val mapitem = loaditem.map( x=> (x.split(",")(1), x))
- //Join Data
- val joinrdd = maprdd.join(mapitem)
- joinrdd.take(5).foreach(println)
- //Get the columns needed --------Tuple ==>Element
- val readrdd = joinrdd.map( x => (x._2._1.split(",")(3) , x._2._2.split(",")(4)))
- readrdd.take(10).foreach(println)
- //Filter data - apply operations--------For Tuple x =>x._1
- val resultrdd = readrdd.filter( x => x._1.equals("CLOSED") && x._2.toFloat >300 )
- resultrdd.collect().foreach(println)
- //Sort the Data By Value in Tuple
- val result1rdd = resultrdd.sortBy(_._2)
- result1rdd.collect().foreach(println)
- }
- }
- # Convert an RDD into a Dataframe and Query the Dataset
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkConf
- // Import the DataFrame functions API
- // import org.apache.spark.sql.functions._
- case class orders(order_id:Int,order_date:String,order_customer_id:Int,order_Status:String)
- object dfquery {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("DiDF")
- val sc = new SparkContext(conf)
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- // this is used to implicitly convert an RDD to a DataFrame.
- import sqlContext.implicits._
- val orderRDD = sc.textFile("/cert/mysqltohdfs/orders").map(x => x.split(","))
- orderRDD.collect().take(5)
- val mappingRDD = orderRDD.map( x => orders(x(0).toInt,x(1),x(2).toInt,x(3)))
- val ordersDF = mappingRDD.toDF()
- ordersDF.registerTempTable("ordersDF")
- val Querydata = sqlContext.sql("select * from ordersDF where order_Status LIKE '%CLOSED%' ")
- Querydata.collect()foreach(println)
- }
- }
- #SBT file with Sparksql
- name := "dfquery"
- version := "1.0"
- scalaVersion := "2.10.4"
- libraryDependencies ++= Seq(
- "org.apache.spark" %% "spark-sql" % "1.3.0",
- "org.apache.spark" %% "spark-core" % "1.3.0"
- )
- #### Python--Pyspark ####
- ######Load data from HDFS and Print
- from pyspark import SparkContext, SparkConf
- conf = SparkConf().setAppName("loaddate").setMaster("local")
- sc = SparkContext(conf = conf)
- distFile = sc.textFile("/cert/mysqltohdfs/customers/part*")
- for line in distFile.collect():
- print(line)
- #Submitting Spark Application
- spark-submit --master local savetext.py
- pyspark file.py
- ######Load data from HDFS and Store data as Text into HDFS
- from pyspark import SparkConf, SparkContext
- conf = SparkConf().setAppName("save").setMaster("local")
- sc = SparkContext(conf=conf)
- loadFile = sc.textFile("/cert/mysqltohdfs/customers/")
- savefile = loadFile.saveAsTextFile("/cert/pyspark/textcustomers")
- for line in loadFile.collect():
- print line
- ########Load data from HDFS and Store data as Sequence File into HDFS
- from pyspark import SparkConf,SparkContext
- conf = SparkConf().setAppName("saveseq").setMaster("local")
- sc = SparkContext(conf=conf)
- loadFile = sc.textFile("/cert/mysqltohdfs/customers/part*")
- #Null as the Key
- loadFile.map(lambda x: (None,x)).saveAsSequenceFile("/cert/pyspark/seqcustomersNone")
- #Give a Unique Row number as Key
- loadFile.map(lambda x: tuple(x.split(",", 1))).saveAsSequenceFile("/cert/pyspark/seqcustomerskey")
- #converted into Byte Stream
- loadFile.saveAsPickleFile("/cert/pyspark/seqcustomersPckle")
- ####Reading a Sequence File
- from pyspark import SparkConf,SparkContext
- conf = SparkConf().setAppName("readSeq")
- sc = SparkContext(conf=conf)
- readseqFileRDD = sc.sequenceFile("/cert/pyspark/seqcustomerskey/part*")
- for line in readseqFileRDD.collect():
- print line
- #####Join Operation
- from pyspark import SparkContext, SparkConf
- conf = SparkConf().setAppName("join").setMaster("local")
- sc = SparkContext(conf=conf)
- #Will not get u' in the ouptut(u' is a String Literal ,used to denote that object is unicode object.
- # It decodes the characters according to the source encoding declaration.)
- #--without u'--it is a byte String(without encoding)
- ordRDD = sc.textFile("/cert/mysqltohdfs/orders/part*",use_unicode=False)
- orditemsRDD = sc.textFile("/cert/mysqltohdfs/order_items/part*",use_unicode=False)
- orders = ordRDD.map(lambda x: x.split(",")).map(lambda x:(x[0],x))
- items = orditemsRDD.map(lambda x:x.split(",")).map(lambda x:(x[1],x))
- joinRDD = orders.join(items)
- ordersd = ordRDD.map(lambda x: (x.split(",")[0],x))
- itemsd = orditemsRDD.map( lambda x: (x.split(",")[1],x))
- joinRDD = ordersd.join(itemsd)
- for line in joinRDD.collect():
- print line
- #####Left Join Operation
- from pyspark import SparkContext, SparkConf
- conf = SparkConf().setAppName("join").setMaster("local")
- sc = SparkContext(conf=conf)
- ordRDD = sc.textFile("/cert/mysqltohdfs/orders/part*",use_unicode=False)
- custRDD = sc.textFile("/cert/mysqltohdfs/customers/part*",use_unicode=False)
- orders = ordRDD.map(lambda x: x.split(",")).map(lambda x:(x[2],x))
- items = custRDD.map(lambda x:x.split(",")).map(lambda x:(x[0],(x[1],x[2])))
- joinRDD = orders.leftOuterJoin(items)
- for line in joinRDD.collect():
- print line
- #count
- cnt = joinRDD.count()
- print cnt
- #####Max,Min,Count
- from pyspark import SparkContext,SparkConf
- conf = SparkConf().setAppName("Maxmin").setMaster("local")
- sc = SparkContext(conf=conf)
- ordRDD = sc.textFile("/cert/mysqltohdfs/orders/part*")
- #map Input Variables
- order_id = 0
- order_date = 1
- order_customer_is = 2
- order_status = 3
- splitRDD = ordRDD.map(lambda x:x.split(","))
- #Count
- totalitems = splitRDD.map(lambda x:x[order_id]).distinct().count()
- print totalitems
- #Max Order Date
- Maxorddate = splitRDD.map(lambda x:x[order_date]).reduce(max)
- print Maxorddate
- #Min Order Data
- #----Can use with reduce also---Minorddate = splitRDD.map(lambda x:x[order_date]).reduce(min)
- Minorddate = splitRDD.map(lambda x:x[order_date]).min()
- print Minorddate
- ######Count, Average & Sum
- from pyspark import SparkConf,SparkContext
- conf = SparkConf().setAppName("Avg").setMaster("local")
- sc = SparkContext(conf=conf)
- ordirdd = sc.textFile("/cert/mysqltohdfs/order_items/part*")
- splitrdd = ordirdd.map(lambda x:x.split(","))
- for line in ordirdd.take(8):
- print line
- #Count
- cnt = splitrdd.map(lambda x:int(x[0])).count()
- print cnt
- #Sum
- total = splitrdd.map(lambda x:float(x[4]))
- for line in total.take(8):
- print line
- Tot = total.sum()
- print Tot
- #Avg
- avg = float(Tot/cnt)
- print avg
- ####Filter operation
- from pyspark import SparkConf,SparkContext
- conf = SparkConf().setAppName("filter").setMaster("local")
- sc = SparkContext(conf=conf)
- ordrdd = sc.textFile("/cert/mysqltohdfs/orders/part*",use_unicode=False)
- for line in ordrdd.take(5):
- print line
- splitrdd = ordrdd.map(lambda x: x.split(","))
- #Filter-Equal
- filterrdd = splitrdd.filter(lambda x: x[3] == "COMPLETE")
- for line in filterrdd.collect():
- print line
- #Equal
- for line in splitrdd.filter(lambda x: x[1] == "2014-07-21 00:00:00.0").collect():
- print line
- #Greater
- greaterrdd = splitrdd.filter(lambda x: x[1] > '2014-07-21 00:00:00')
- for line in greaterrdd.collect():
- print line
- #####Sorting operation
- from pyspark import SparkContext,SparkConf
- conf = SparkConf().setAppName("Sort").setMaster("local")
- sc = SparkContext(conf=conf)
- ordRDD = sc.textFile("/cert/mysqltohdfs/orders/part*")
- for line in ordRDD.take(5):
- print line
- splitRDD = ordRDD.map( lambda x:x.split(","))
- reduceRDD = splitRDD.map( lambda x:(x[1],1))
- for line in reduceRDD.take(5):
- print line
- #Count Number of Orders per Date
- countRDD = reduceRDD.reduceByKey(lambda x,y : x+y)
- for line in countRDD.take(7):
- print line
- newrdd = ordRDD.map(lambda x : ((x.split(",")[1]),x))
- for line in newrdd.take(5):
- print line
- sortRDD = countRDD.sortByKey()
- for line in sortRDD.take(5):
- print line
- #Aggregate Key
- newmaprdd = splitRDD.map( lambda x: (str(x[3]),str(x[1])))
- for line in newmaprdd.take(5):
- print line
- aggRDD = newmaprdd.aggregateByKey(0, lambda x,y : x+1 , lambda x,y : x+y )
- for line in aggRDD.collect():
- print line
- #####Convert an RDD into a Dataframe and Query the Dataset
- from pyspark import SparkContext
- #from pyspark import SparkConf
- from pyspark.sql import SQLContext,Row
- sc = SparkContext()
- sqlContext = SQLContext(sc)
- ordRDD = sc.textFile("/cert/mysqltohdfs/orders/p*")
- splitRDD = ordRDD.map(lambda x: x.split(","))
- #convert each line into a Row
- orders = splitRDD.map(lambda x: Row(order_id=x[0],order_date=x[1],order_customer_id=x[2],order_status=x[3]))
- # Infer the schema, and register the DataFrame as a table.
- schemadf = sqlContext.inferSchema(orders)
- schemadf.registerTempTable("orders")
- dfframe = sqlContext.sql("Select * from orders where order_status='COMPLETE'")
- for line in dfframe.collect():
- print line
- #### Rank Query in Spark ##
- from pyspark import SparkContext
- from pyspark.sql import SQLContext,Row
- from pyspark.sql import HiveContext
- sc = SparkContext()
- sqlContext = HiveContext(sc)
- productRDD = sc.textFile("/cert/mysqltohdfs/products/p*")
- splitRDD = productRDD.map(lambda x:x.split(","))
- products = splitRDD.map(lambda x: Row(product_id=x[0],product_category_id=x[1],product_name=x[2],product_description=x[3],product_price = x[4],product_image=x[5]))
- # Infer the schema, and register the DataFrame as a table
- schemadff = sqlContext.inferSchema(products)
- schemadff.registerTempTable("products")
- Query = sqlContext.sql("select *, rank() over (partition by product_category_id order by product_price desc) from products")
- for line in Query.collect():
- print line
- ### #Importing a MYSQL Table as a Avro datafile to HDFS
- sqoop import \
- --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
- --username=root \
- --password=cloudera \
- --m 1 \
- --table=categories \
- --as-avrodatafile \
- --target-dir /cert/mysqltohdfs/avro/categories \
- ####Creating a Hive Table on Avro Data File with Schema
- create EXTERNAL table cert.order_items
- (
- order_item_id int,
- order_item_order_id int,
- order_item_product_id int,
- order_item_quantity int,
- order_item_subtotal float,
- order_item_product_price float
- )
- STORED AS AVRO
- LOCATION '/cert/mysqltohdfs/avro/order_items';
- ###Creating a Hive Table on Avro Data File with AVRO Schema
- create EXTERNAL table cert.products
- ROW FORMAT SERDE
- 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
- STORED AS INPUTFORMAT
- 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
- OUTPUTFORMAT
- 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
- LOCATION '/cert/mysqltohdfs/avro/products'
- tblproperties('avro.schema.literal'='{
- "type" : "record",
- "name" : "sqoop_import_products",
- "doc" : "Sqoop import of products",
- "fields" : [ {
- "name" : "product_id",
- "type" : [ "int", "null" ],
- "columnName" : "product_id",
- "sqlType" : "4"
- }, {
- "name" : "product_category_id",
- "type" : [ "int", "null" ],
- "columnName" : "product_category_id",
- "sqlType" : "4"
- }, {
- "name" : "product_name",
- "type" : [ "string", "null" ],
- "columnName" : "product_name",
- "sqlType" : "12"
- }, {
- "name" : "product_description",
- "type" : [ "string", "null" ],
- "columnName" : "product_description",
- "sqlType" : "12"
- }, {
- "name" : "product_price",
- "type" : [ "float", "null" ],
- "columnName" : "product_price",
- "sqlType" : "7"
- }, {
- "name" : "product_image",
- "type" : [ "string", "null" ],
- "columnName" : "product_image",
- "sqlType" : "12"
- } ],
- "tableName" : "products"
- }
- ');
- #####Create a table on Avro Data File with Avro Schema File
- create EXTERNAL table cert.avrocategories
- (
- category_id int,
- category_dept_id int,
- category_name string
- )
- STORED AS AVRO
- LOCATION '/cert/mysqltohdfs/avro/categories'
- TBLPROPERTIES('avro.schema.url'='hdfs://localhost/cert/mysqltohdfs/avro/sqoop_import_categories.avsc');
- ####To get Schema from Avro Data File
- avro-tools getschema part-m-00000.avro >new.txt
- ###Create an Partition Table(Dynamic) and imporove Query Performance
- CREATE EXTERNAL TABLE ORDERS_PART
- (
- order_id int,
- order_date TIMESTAMP,
- order_customer_id int,
- order_status varchar(45)
- )
- PARTITIONED BY(year string,month string)
- LOCATION '/cert/mysqltohdfs/orders/';
- ###Select Records from Main table and insert into Parition Table
- insert overwrite table orders_part partition(year,month) select o.*,year(o.order_date),month(o.order_date) from orders o ;
- ###Creating an Avro Schema File
- {"namespace": "sqoop_import_avrosample.avsc",
- "type": "record",
- "name": "User",
- "fields": [
- {"name": "name", "type": "string"},
- {"name": "number", "type": ["int", "null"]},
- {"name": "color", "type": ["string", "null"]}
- ]
- }
- CREATE EXTERNAL TABLE AVROSAMPLE
- STORED AS AVRO
- LOCATION '/cert/mysqltohdfs/categories/'
- tblproperties('avro.schema.url'='hdfs://localhost/cert/mysqltohdfs/avro/sqoop_import_avrosample.avsc')
- #### IMPALA ####
- Same as Hive Command
- Refresh tables to detect any metadata changes
- INVALIDATE METADATA waits to reload the metadata when needed for a subsequent query, but reloads all the metadata for the table, which can be an expensive operation, especially for large tables with many partitions. REFRESH reloads the metadata immediately, but only loads the block location data for newly added data files, making it a less expensive operation overall.
Add Comment
Please, Sign In to add comment