daily pastebin goal
54%
SHARE
TWEET

copytable_mapreduce_scala

ans4175 Jan 9th, 2015 190 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. class tmapper extends TableMapper[ImmutableBytesWritable, Put]{
  2.   type Contxt = Mapper[ImmutableBytesWritable, Result, ImmutableBytesWritable, Put]#Context
  3.   override def map (row: ImmutableBytesWritable, value: Result, context: Contxt) {
  4.     val put = new Put(row.get())
  5.     val key = Bytes.toString(row.get())
  6.         for (kv <- value.raw()) {
  7.                 put.add(kv)
  8.         }
  9.     context.write(row, put)
  10.   }
  11. }
  12.  
  13. class treducer extends TableReducer[ImmutableBytesWritable, Put, ImmutableBytesWritable]{
  14.         type Contxt = Reducer[ImmutableBytesWritable, Put, ImmutableBytesWritable, Put]#Context
  15.         def reduce (row: ImmutableBytesWritable, values: Put, context:Contxt){
  16.                 context.write(row, values)
  17.         }
  18. }
  19.  
  20. object Hello {
  21.   val hbaseMaster = "127.0.0.1:60000"
  22.   val hbaseZookeper = "127.0.0.1"
  23.   def main(args: Array[String]): Unit = {
  24.         val conf = HBaseConfiguration.create()
  25.         conf.set("hbase.master", hbaseMaster)
  26.         conf.set("hbase.zookeeper.quorum", hbaseZookeper)
  27.         val hbaseAdmin = new HBaseAdmin(conf)
  28.        
  29.         val scan = new Scan()
  30.         scan.setCaching(500)         // 1 is the default in Scan, which will be bad for MapReduce jobs
  31.         scan.setCacheBlocks(false)   // don't set to true for MR jobs
  32.         val job = new Job(conf)
  33.         job.setJobName("CopyTable");
  34.         job.setJarByClass(classOf[Hello])
  35.    
  36.         TableMapReduceUtil.initTableMapperJob(
  37.           "nama_tabel_original",     // input HBase table name
  38.           scan,                      // Scan instance to control CF and attribute selection
  39.           classOf[tmapper],  // mapper class
  40.           classOf[ImmutableBytesWritable],             // mapper output key class
  41.           classOf[Put],     // mapper output value class
  42.           job
  43.         )
  44.         job.setMapperClass(classOf[tmapper])
  45.    
  46.         TableMapReduceUtil.initTableReducerJob(
  47.           "nama_tabel_tujuan",          // Table name
  48.           classOf[treducer], // Reducer class
  49.           job
  50.         )
  51.         job.setReducerClass(classOf[treducer])
  52.         job.setNumReduceTasks(1)
  53.  
  54.         job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  55.         job.setOutputValueClass(classOf[Put])
  56.  
  57.         job.setOutputFormatClass(classOf[TableOutputFormat[Unit]])
  58.         job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "nama_tabel_tujuan")
  59.  
  60.         val b = job.waitForCompletion(true);
  61.         if (!b) {
  62.             println("Mapprogress="+job.mapProgress())
  63.         }
  64.   }
  65. }
  66.  
  67. class Hello {}
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top