Advertisement
ans4175

copyTable

Dec 16th, 2014
213
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 4.57 KB | None | 0 0
  1. package com.example
  2.  
  3. import org.apache.hadoop.hbase.HBaseConfiguration
  4. import org.apache.hadoop.hbase.client.HBaseAdmin
  5. import org.apache.hadoop.hbase.client.HTable
  6. import org.apache.hadoop.hbase.util.Bytes
  7. import org.apache.hadoop.hbase.client.Put
  8. import org.apache.hadoop.hbase.client.Get
  9. import java.io.IOException
  10. import org.apache.hadoop.conf.Configuration
  11. import org.apache.hadoop.hbase._
  12. import org.apache.hadoop.hbase.client._
  13. import org.apache.hadoop.hbase.io._
  14. import org.apache.hadoop.hbase.mapreduce._
  15. import org.apache.hadoop.io._
  16. import org.apache.hadoop.mapreduce._
  17. import scala.collection.JavaConversions._
  18.  
  19. case class HString(name: String) {
  20.     lazy val bytes = name.getBytes
  21.     override def toString = name
  22. }
  23. object HString {
  24.     import scala.language.implicitConversions
  25.     implicit def hstring2String(src: HString): String = src.name
  26.     implicit def hstring2Bytes(src: HString): Array[Byte] = src.bytes
  27. }
  28.  
  29. object Families {
  30.     val stream = HString("stream")
  31.     val identity = HString("identity")
  32. }
  33. object Qualifiers {
  34.     val title = HString("title")
  35.     val url = HString("url")
  36.     val media = HString("media")
  37.     val media_source = HString("media_source")
  38.     val content = HString("content")
  39.     val nolimitid_timestamp = HString("nolimitid.timestamp")
  40.     val original_id = HString("original_id")
  41.     val timestamp = HString("timestamp")
  42.     val date_created = HString("date_created")
  43.     val count = HString("count")
  44. }
  45. object Tables {
  46.     val rawstream100 = HString("raw_stream_1.0.0")
  47.     val rawstream = HString("rawstream")
  48. }
  49.  
  50. /*class tmapper extends TableMapper[Text, LongWritable]{
  51.   type Context = Mapper[ImmutableBytesWritable, Result, Text, LongWritable]
  52.  
  53.   val word = new Text
  54.   val one = new LongWritable(1)
  55.  
  56.   def map (key: ImmutableBytesWritable, value: Result, ctxt: Context)  = {
  57.     val cell = value.getColumnLatest(Families.stream , Qualifiers.content)
  58.     val text = new String(cell.getValue()).toLowerCase
  59.  
  60.     val tokenizer = new StringTokenizer(text)
  61.     while (tokenizer.hasMoreTokens) {
  62.       val term = tokenizer.nextToken
  63.       if (term.matches("[a-zA-Z0-9]+")) {
  64.         word.set(term)
  65.         //write(word, one)
  66.       }
  67.     }
  68.   }
  69. }*/
  70.  
  71. /*class treducer extends TableReducer[Text, LongWritable, ImmutableBytesWritable] {
  72.   type Context = Reducer[Text, LongWritable, ImmutableBytesWritable, Mutation]
  73.  
  74.   def reduce(key: Text,values: java.lang.Iterable[LongWritable],context: Context) = {
  75.     val count = values.foldLeft(0L) { (tally, i) => tally + i.get }
  76.     val put = new Put(key.toString.getBytes, count)  // be sure to comment on toString.getBytes
  77.     put.add(Families.content, Qualifiers.count, Bytes.toBytes(count))
  78.     context.write(null, put)
  79.   }
  80.  
  81. }*/
  82.  
  83. class tmapper extends TableMapper[ImmutableBytesWritable, Put]{
  84.   def resultToPut(key: ImmutableBytesWritable, result: Result) : Put = {
  85.     val put = new Put(key.get())
  86.     for (kv <- result.raw()) {
  87.         put.add(kv);
  88.     }
  89.     return put;
  90.   }
  91.  
  92.   def map (row: ImmutableBytesWritable, value: Result, context: Context) {
  93.     context.write(row, resultToPut(row, value))
  94.   }
  95. }
  96.  
  97. object Hello {
  98.   val hbaseMaster = "127.0.0.1:60000"
  99.   val hbaseZookeper = "127.0.0.1"
  100.   def main(args: Array[String]): Unit = {
  101.     val conf = HBaseConfiguration.create()
  102.     conf.set("hbase.master", hbaseMaster)
  103.     conf.set("hbase.zookeeper.quorum", hbaseZookeper)
  104.     val hbaseAdmin = new HBaseAdmin(conf)
  105.     //check table
  106.     /*if(!hbaseAdmin.isTableAvailable("raw_stream_1.0.0")) {
  107.         println("Table doesn't exist..quitting")
  108.     }else{
  109.         println("Table exists..")
  110.         try{
  111.             hbaseAdmin.snapshot("raw-snapshot", "raw_stream_1.0.0")
  112.             hbaseAdmin.cloneSnapshot("raw-snapshot", "raw_stream")
  113.         } catch {
  114.           case ioe : IOException => println("Error IO "+ioe)
  115.         }
  116.     }*/
  117.     val job = Job.getInstance(conf, "CopyTable")
  118.     job.setJarByClass(classOf[Hello])
  119.    
  120.     val scan = new Scan()
  121.     scan.setCaching(500)         // 1 is the default in Scan, which will be bad for MapReduce jobs
  122.     scan.setCacheBlocks(false)   // don't set to true for MR jobs
  123.      
  124.     TableMapReduceUtil.initTableMapperJob(
  125.       Tables.rawstream100.bytes,     // input HBase table name
  126.       scan,                      // Scan instance to control CF and attribute selection
  127.       classOf[tmapper],  // mapper class
  128.       null,             // mapper output key class
  129.       null,     // mapper output value class
  130.       job
  131.     )
  132.      
  133.     TableMapReduceUtil.initTableReducerJob(
  134.       Tables.rawstream,          // Table name
  135.       null, // Reducer class
  136.       job
  137.     )
  138.     val b = job.waitForCompletion(true);
  139.     if (!b) {
  140.         throw new IOException("error with job!");
  141.     }
  142.   }
  143. }
  144.  
  145. class Hello {}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement