Advertisement
ans4175

map-reduce-scala

Jan 7th, 2015
296
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 4.20 KB | None | 0 0
  1. package com.example
  2.  
  3. import org.apache.hadoop.hbase.HBaseConfiguration
  4. import org.apache.hadoop.hbase.client.HBaseAdmin
  5. import org.apache.hadoop.hbase.client.HTable
  6. import org.apache.hadoop.hbase.util.Bytes
  7. import org.apache.hadoop.hbase.client.Put
  8. import org.apache.hadoop.hbase.client.Get
  9. import java.io.IOException
  10. import org.apache.hadoop.conf.Configuration
  11. import org.apache.hadoop.hbase._
  12. import org.apache.hadoop.hbase.client._
  13. import org.apache.hadoop.hbase.io._
  14. import org.apache.hadoop.hbase.mapreduce._
  15. import org.apache.hadoop.io._
  16. import org.apache.hadoop.mapreduce._
  17. import scala.collection.JavaConversions._
  18. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
  19.  
  20. case class HString(name: String) {
  21.     lazy val bytes = name.getBytes
  22.     override def toString = name
  23. }
  24. object HString {
  25.     import scala.language.implicitConversions
  26.     implicit def hstring2String(src: HString): String = src.name
  27.     implicit def hstring2Bytes(src: HString): Array[Byte] = src.bytes
  28. }
  29.  
  30. object Families {
  31.     val stream = HString("stream")
  32.     val identity = HString("identity")
  33.     val cf = HString("cf")
  34. }
  35. object Qualifiers {
  36.     val title = HString("title")
  37.     val url = HString("url")
  38.     val media = HString("media")
  39.     val media_source = HString("media_source")
  40.     val content = HString("content")
  41.     val nolimitid_timestamp = HString("nolimitid.timestamp")
  42.     val original_id = HString("original_id")
  43.     val timestamp = HString("timestamp")
  44.     val date_created = HString("date_created")
  45.     val count = HString("count")
  46. }
  47. object Tables {
  48.     val rawstream100 = HString("raw_stream_1.0.0")
  49.     val rawstream = HString("rawCopy")
  50. }
  51.  
  52. class tmapper extends TableMapper[Text, IntWritable]{
  53.   type Contxt = Mapper[ImmutableBytesWritable, Result, Text, IntWritable]#Context
  54.   override def map (row: ImmutableBytesWritable, value: Result, context: Contxt) {
  55.     val key = Bytes.toString(row.get())
  56.     /*val put = new Put(row.get())
  57.     put.add(Families.cf.bytes , Qualifiers.count.bytes, Bytes.toBytes(1))
  58.     context.write(row, put)*/
  59.     context.write(new Text(key), new IntWritable(1))
  60.   }
  61. }
  62.  
  63. class treducer extends TableReducer[ImmutableBytesWritable, IntWritable, ImmutableBytesWritable]{
  64.     type Contxt = Reducer[ImmutableBytesWritable, IntWritable, ImmutableBytesWritable, Mutation]#Context
  65.     def reduce(key: Text, values: Iterable[IntWritable], context:Contxt){
  66.         /*var i=0
  67.         for (v <- values) {
  68.           i += v.get()
  69.         }*/
  70.         val put = new Put(Bytes.toBytes(key.toString()))  // be sure to comment on toString.getBytes
  71.         put.add(Families.cf.bytes , Qualifiers.count.bytes, Bytes.toBytes(values.size))
  72.    
  73.         context.write(null, put)
  74.     }
  75. }
  76.  
  77. object Hello {
  78.   val hbaseMaster = "127.0.0.1:60000"
  79.   val hbaseZookeper = "127.0.0.1"
  80.   def main(args: Array[String]): Unit = {
  81.     val conf = HBaseConfiguration.create()
  82.     conf.set("hbase.master", hbaseMaster)
  83.     conf.set("hbase.zookeeper.quorum", hbaseZookeper)
  84.     val hbaseAdmin = new HBaseAdmin(conf)
  85.     val scan = new Scan()
  86.     scan.setCaching(500)         // 1 is the default in Scan, which will be bad for MapReduce jobs
  87.     scan.setCacheBlocks(false)   // don't set to true for MR jobs
  88.    
  89.     val job = new Job(conf)
  90.     job.setJobName("CopyTable");
  91.     job.setJarByClass(classOf[Hello])
  92.    
  93.     TableMapReduceUtil.initTableMapperJob(
  94.       "raw_stream_1.0.0",     // input HBase table name
  95.       scan,                      // Scan instance to control CF and attribute selection
  96.       classOf[tmapper],  // mapper class
  97.       classOf[Text],             // mapper output key class
  98.       classOf[IntWritable],     // mapper output value class
  99.       job
  100.     )
  101.     job.setMapperClass(classOf[tmapper])
  102.    
  103.     TableMapReduceUtil.initTableReducerJob(
  104.       "test",          // Table name
  105.       classOf[treducer], // Reducer class
  106.       job
  107.     )
  108.     //job.setCombinerClass(classOf[treducer])
  109.     job.setReducerClass(classOf[treducer])
  110.     job.setNumReduceTasks(1)
  111.     //
  112.     job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  113.     job.setOutputValueClass(classOf[Put])
  114.     //
  115.     job.setOutputFormatClass(classOf[TableOutputFormat[Unit]])
  116.     job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "test")
  117.     //
  118.  
  119.     val b = job.waitForCompletion(true);
  120.     if (!b) {
  121.         //
  122.     }
  123.     println("Mapprogress="+job.mapProgress())
  124.   }
  125. }
  126.  
  127. class Hello {}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement