Advertisement
ans4175

copytable_mapreduce_scala

Jan 9th, 2015
338
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.23 KB | None | 0 0
  1. class tmapper extends TableMapper[ImmutableBytesWritable, Put]{
  2.   type Contxt = Mapper[ImmutableBytesWritable, Result, ImmutableBytesWritable, Put]#Context
  3.   override def map (row: ImmutableBytesWritable, value: Result, context: Contxt) {
  4.     val put = new Put(row.get())
  5.     val key = Bytes.toString(row.get())
  6.     for (kv <- value.raw()) {
  7.         put.add(kv)
  8.     }
  9.     context.write(row, put)
  10.   }
  11. }
  12.  
  13. class treducer extends TableReducer[ImmutableBytesWritable, Put, ImmutableBytesWritable]{
  14.     type Contxt = Reducer[ImmutableBytesWritable, Put, ImmutableBytesWritable, Put]#Context
  15.     def reduce (row: ImmutableBytesWritable, values: Put, context:Contxt){
  16.         context.write(row, values)
  17.     }
  18. }
  19.  
  20. object Hello {
  21.   val hbaseMaster = "127.0.0.1:60000"
  22.   val hbaseZookeper = "127.0.0.1"
  23.   def main(args: Array[String]): Unit = {
  24.     val conf = HBaseConfiguration.create()
  25.     conf.set("hbase.master", hbaseMaster)
  26.     conf.set("hbase.zookeeper.quorum", hbaseZookeper)
  27.     val hbaseAdmin = new HBaseAdmin(conf)
  28.    
  29.     val scan = new Scan()
  30.     scan.setCaching(500)         // 1 is the default in Scan, which will be bad for MapReduce jobs
  31.     scan.setCacheBlocks(false)   // don't set to true for MR jobs
  32.     val job = new Job(conf)
  33.     job.setJobName("CopyTable");
  34.     job.setJarByClass(classOf[Hello])
  35.    
  36.     TableMapReduceUtil.initTableMapperJob(
  37.       "nama_tabel_original",     // input HBase table name
  38.       scan,                      // Scan instance to control CF and attribute selection
  39.       classOf[tmapper],  // mapper class
  40.       classOf[ImmutableBytesWritable],             // mapper output key class
  41.       classOf[Put],     // mapper output value class
  42.       job
  43.     )
  44.     job.setMapperClass(classOf[tmapper])
  45.    
  46.     TableMapReduceUtil.initTableReducerJob(
  47.       "nama_tabel_tujuan",          // Table name
  48.       classOf[treducer], // Reducer class
  49.       job
  50.     )
  51.     job.setReducerClass(classOf[treducer])
  52.     job.setNumReduceTasks(1)
  53.  
  54.     job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  55.     job.setOutputValueClass(classOf[Put])
  56.  
  57.     job.setOutputFormatClass(classOf[TableOutputFormat[Unit]])
  58.     job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "nama_tabel_tujuan")
  59.  
  60.     val b = job.waitForCompletion(true);
  61.     if (!b) {
  62.         println("Mapprogress="+job.mapProgress())
  63.     }
  64.   }
  65. }
  66.  
  67. class Hello {}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement