Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class tmapper extends TableMapper[ImmutableBytesWritable, Put]{
- type Contxt = Mapper[ImmutableBytesWritable, Result, ImmutableBytesWritable, Put]#Context
- override def map (row: ImmutableBytesWritable, value: Result, context: Contxt) {
- val put = new Put(row.get())
- val key = Bytes.toString(row.get())
- for (kv <- value.raw()) {
- put.add(kv)
- }
- context.write(row, put)
- }
- }
- class treducer extends TableReducer[ImmutableBytesWritable, Put, ImmutableBytesWritable]{
- type Contxt = Reducer[ImmutableBytesWritable, Put, ImmutableBytesWritable, Put]#Context
- def reduce (row: ImmutableBytesWritable, values: Put, context:Contxt){
- context.write(row, values)
- }
- }
- object Hello {
- val hbaseMaster = "127.0.0.1:60000"
- val hbaseZookeper = "127.0.0.1"
- def main(args: Array[String]): Unit = {
- val conf = HBaseConfiguration.create()
- conf.set("hbase.master", hbaseMaster)
- conf.set("hbase.zookeeper.quorum", hbaseZookeper)
- val hbaseAdmin = new HBaseAdmin(conf)
- val scan = new Scan()
- scan.setCaching(500) // 1 is the default in Scan, which will be bad for MapReduce jobs
- scan.setCacheBlocks(false) // don't set to true for MR jobs
- val job = new Job(conf)
- job.setJobName("CopyTable");
- job.setJarByClass(classOf[Hello])
- TableMapReduceUtil.initTableMapperJob(
- "nama_tabel_original", // input HBase table name
- scan, // Scan instance to control CF and attribute selection
- classOf[tmapper], // mapper class
- classOf[ImmutableBytesWritable], // mapper output key class
- classOf[Put], // mapper output value class
- job
- )
- job.setMapperClass(classOf[tmapper])
- TableMapReduceUtil.initTableReducerJob(
- "nama_tabel_tujuan", // Table name
- classOf[treducer], // Reducer class
- job
- )
- job.setReducerClass(classOf[treducer])
- job.setNumReduceTasks(1)
- job.setOutputKeyClass(classOf[ImmutableBytesWritable])
- job.setOutputValueClass(classOf[Put])
- job.setOutputFormatClass(classOf[TableOutputFormat[Unit]])
- job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "nama_tabel_tujuan")
- val b = job.waitForCompletion(true);
- if (!b) {
- println("Mapprogress="+job.mapProgress())
- }
- }
- }
- class Hello {}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement