Advertisement
Guest User

mapreduce-scala

a guest
Jan 5th, 2015
173
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.59 KB | None | 0 0
  1. package com.example
  2.  
  3. import org.apache.hadoop.hbase.HBaseConfiguration
  4. import org.apache.hadoop.hbase.client.HBaseAdmin
  5. import org.apache.hadoop.hbase.client.HTable
  6. import org.apache.hadoop.hbase.util.Bytes
  7. import org.apache.hadoop.hbase.client.Put
  8. import org.apache.hadoop.hbase.client.Get
  9. import java.io.IOException
  10. import org.apache.hadoop.conf.Configuration
  11. import org.apache.hadoop.hbase._
  12. import org.apache.hadoop.hbase.client._
  13. import org.apache.hadoop.hbase.io._
  14. import org.apache.hadoop.hbase.mapreduce._
  15. import org.apache.hadoop.io._
  16. import org.apache.hadoop.mapreduce._
  17. import scala.collection.JavaConversions._
  18. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
  19.  
  20. case class HString(name: String) {
  21.     lazy val bytes = name.getBytes
  22.     override def toString = name
  23. }
  24. object HString {
  25.     import scala.language.implicitConversions
  26.     implicit def hstring2String(src: HString): String = src.name
  27.     implicit def hstring2Bytes(src: HString): Array[Byte] = src.bytes
  28. }
  29.  
  30. object Families {
  31.     val stream = HString("stream")
  32.     val identity = HString("identity")
  33. }
  34. object Qualifiers {
  35.     val title = HString("title")
  36.     val url = HString("url")
  37.     val media = HString("media")
  38.     val media_source = HString("media_source")
  39.     val content = HString("content")
  40.     val nolimitid_timestamp = HString("nolimitid.timestamp")
  41.     val original_id = HString("original_id")
  42.     val timestamp = HString("timestamp")
  43.     val date_created = HString("date_created")
  44.     val count = HString("count")
  45. }
  46. object Tables {
  47.     val rawstream100 = HString("raw_stream_1.0.0")
  48.     val rawstream = HString("rawCopy")
  49. }
  50.  
  51. class tmapper extends TableMapper[Text, Put]{
  52.   def map (row: ImmutableBytesWritable, value: Result, context: Context) {
  53.     val put = new Put(row.get())
  54.     val key = Bytes.toString(row.get());
  55.     for (kv <- value.raw()) {
  56.         put.add(kv)
  57.     }
  58.     context.write(new Text(key), put)
  59.   }
  60. }
  61.  
  62. class treducer extends TableReducer[Text, Put, Put]{
  63.     def reduce (row: Text, values: Put, context:Context){
  64.         context.write(values, values);
  65.     }
  66. }
  67.  
  68. object Hello {
  69.   val hbaseMaster = "127.0.0.1:60000"
  70.   val hbaseZookeper = "127.0.0.1"
  71.   def main(args: Array[String]): Unit = {
  72.     val conf = HBaseConfiguration.create()
  73.     conf.set("hbase.master", hbaseMaster)
  74.     conf.set("hbase.zookeeper.quorum", hbaseZookeper)
  75.     val hbaseAdmin = new HBaseAdmin(conf)
  76.     val scan = new Scan()
  77.     scan.setCaching(500)         // 1 is the default in Scan, which will be bad for MapReduce jobs
  78.     scan.setCacheBlocks(false)   // don't set to true for MR jobs
  79.    
  80.     val job = new Job(conf)
  81.     job.setJobName("CopyTable");
  82.     job.setJarByClass(classOf[Hello])
  83.    
  84.     TableMapReduceUtil.initTableMapperJob(
  85.       Tables.rawstream100.name,     // input HBase table name
  86.       scan,                      // Scan instance to control CF and attribute selection
  87.       classOf[tmapper],  // mapper class
  88.       classOf[ImmutableBytesWritable],             // mapper output key class
  89.       classOf[Result],     // mapper output value class
  90.       job
  91.     )
  92.     job.setMapperClass(classOf[tmapper])
  93.     job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
  94.     job.setMapOutputValueClass(classOf[Result])
  95.    
  96.     TableMapReduceUtil.initTableReducerJob(
  97.       Tables.rawstream.name,          // Table name
  98.       classOf[treducer], // Reducer class
  99.       job
  100.     )
  101.     job.setReducerClass(classOf[treducer]);
  102.     job.setNumReduceTasks(1);
  103.     job.setOutputFormatClass(classOf[TableOutputFormat[Unit]]);
  104.     job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, Tables.rawstream);
  105.  
  106.     val b = job.waitForCompletion(true);
  107.     if (!b) {
  108.         throw new IOException("error within job!");
  109.     }
  110.   }
  111. }
  112.  
  113. class Hello {}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement