Advertisement
ans4175

scala-mapreduce

Jan 5th, 2015
239
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 8.65 KB | None | 0 0
  1. I got error:
  2. 15/01/05 12:57:19 INFO mapred.JobClient: Running job: job_local1115158155_0001
  3. 15/01/05 12:57:20 INFO mapred.LocalJobRunner: Waiting for map tasks
  4. 15/01/05 12:57:20 INFO mapred.LocalJobRunner: Starting task: attempt_local1115158155_0001_m_000000_0
  5. 15/01/05 12:57:20 INFO mapreduce.TableOutputFormat: Created table instance for rawCopy
  6. 15/01/05 12:57:20 INFO util.ProcessTree: setsid exited with exit code 0
  7. 15/01/05 12:57:20 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@a2e890
  8. 15/01/05 12:57:20 INFO mapred.MapTask: Processing split: ans4175-PC:,
  9. 15/01/05 12:57:20 INFO mapred.MapTask: io.sort.mb = 100
  10. 15/01/05 12:57:20 INFO mapred.MapTask: data buffer = 79691776/99614720
  11. 15/01/05 12:57:20 INFO mapred.MapTask: record buffer = 262144/327680
  12. 15/01/05 12:57:20 INFO mapred.JobClient:  map 0% reduce 0%
  13. 15/01/05 12:57:21 INFO mapred.MapTask: Starting flush of map output
  14. 15/01/05 12:57:21 INFO mapred.MapTask: Finished spill 0
  15. 15/01/05 12:57:21 INFO mapred.Task: Task:attempt_local1115158155_0001_m_000000_0 is done. And is in the process of commiting
  16. 15/01/05 12:57:21 INFO mapred.LocalJobRunner:
  17. 15/01/05 12:57:21 INFO mapred.Task: Task 'attempt_local1115158155_0001_m_000000_0' done.
  18. 15/01/05 12:57:21 INFO mapred.LocalJobRunner: Finishing task: attempt_local1115158155_0001_m_000000_0
  19. 15/01/05 12:57:21 INFO mapred.LocalJobRunner: Map task executor complete.
  20. 15/01/05 12:57:21 INFO mapreduce.TableOutputFormat: Created table instance for rawCopy
  21. 15/01/05 12:57:21 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@39d718
  22. 15/01/05 12:57:21 INFO mapred.LocalJobRunner:
  23. 15/01/05 12:57:21 INFO mapred.Merger: Merging 1 sorted segments
  24. 15/01/05 12:57:21 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1279220 bytes
  25. 15/01/05 12:57:21 INFO mapred.LocalJobRunner:
  26. 15/01/05 12:57:21 WARN mapred.FileOutputCommitter: Output path is null in cleanup
  27. 15/01/05 12:57:21 WARN mapred.LocalJobRunner: job_local1115158155_0001
  28. java.io.IOException: Pass a Delete or a Put
  29.     at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125)
  30.     at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84)
  31.     at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:586)
  32.     at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
  33.     at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
  34.     at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
  35.     at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
  36.     at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
  37.     at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)
  38. 15/01/05 12:57:21 INFO mapred.JobClient: Job complete: job_local1115158155_0001
  39. 15/01/05 12:57:22 INFO mapred.JobClient: Counters: 19
  40. 15/01/05 12:57:22 INFO mapred.JobClient:   File Input Format Counters
  41. 15/01/05 12:57:22 INFO mapred.JobClient:     Bytes Read=0
  42. 15/01/05 12:57:22 INFO mapred.JobClient:   FileSystemCounters
  43. 15/01/05 12:57:22 INFO mapred.JobClient:     FILE_BYTES_READ=12384691
  44. 15/01/05 12:57:22 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=13838388
  45. 15/01/05 12:57:22 INFO mapred.JobClient:   Map-Reduce Framework
  46. 15/01/05 12:57:22 INFO mapred.JobClient:     Reduce input groups=0
  47. 15/01/05 12:57:22 INFO mapred.JobClient:     Map output materialized bytes=1279224
  48. 15/01/05 12:57:22 INFO mapred.JobClient:     Combine output records=0
  49. 15/01/05 12:57:22 INFO mapred.JobClient:     Map input records=285
  50. 15/01/05 12:57:22 INFO mapred.JobClient:     Reduce shuffle bytes=0
  51. 15/01/05 12:57:22 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
  52. 15/01/05 12:57:22 INFO mapred.JobClient:     Reduce output records=0
  53. 15/01/05 12:57:22 INFO mapred.JobClient:     Spilled Records=285
  54. 15/01/05 12:57:22 INFO mapred.JobClient:     Map output bytes=1278078
  55. 15/01/05 12:57:22 INFO mapred.JobClient:     Total committed heap usage (bytes)=1029046272
  56. 15/01/05 12:57:22 INFO mapred.JobClient:     CPU time spent (ms)=0
  57. 15/01/05 12:57:22 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
  58. 15/01/05 12:57:22 INFO mapred.JobClient:     SPLIT_RAW_BYTES=77
  59. 15/01/05 12:57:22 INFO mapred.JobClient:     Map output records=285
  60. 15/01/05 12:57:22 INFO mapred.JobClient:     Combine input records=0
  61. 15/01/05 12:57:22 INFO mapred.JobClient:     Reduce input records=0
  62. [error] (run-main-0) java.io.IOException: error within job!
  63. java.io.IOException: error within job!
  64.     at com.example.Hello$.main(Hello.scala:128)
  65.     at com.example.Hello.main(Hello.scala)
  66.     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  67.     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  68.     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  69.     at java.lang.reflect.Method.invoke(Method.java:601)
  70. [trace] Stack trace suppressed: run last compile:run for the full output.
  71.  
  72. from code:
  73. package com.example
  74.  
  75. import org.apache.hadoop.hbase.HBaseConfiguration
  76. import org.apache.hadoop.hbase.client.HBaseAdmin
  77. import org.apache.hadoop.hbase.client.HTable
  78. import org.apache.hadoop.hbase.util.Bytes
  79. import org.apache.hadoop.hbase.client.Put
  80. import org.apache.hadoop.hbase.client.Get
  81. import java.io.IOException
  82. import org.apache.hadoop.conf.Configuration
  83. import org.apache.hadoop.hbase._
  84. import org.apache.hadoop.hbase.client._
  85. import org.apache.hadoop.hbase.io._
  86. import org.apache.hadoop.hbase.mapreduce._
  87. import org.apache.hadoop.io._
  88. import org.apache.hadoop.mapreduce._
  89. import scala.collection.JavaConversions._
  90. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
  91.  
  92. case class HString(name: String) {
  93.     lazy val bytes = name.getBytes
  94.     override def toString = name
  95. }
  96. object HString {
  97.     import scala.language.implicitConversions
  98.     implicit def hstring2String(src: HString): String = src.name
  99.     implicit def hstring2Bytes(src: HString): Array[Byte] = src.bytes
  100. }
  101.  
  102. object Families {
  103.     val stream = HString("stream")
  104.     val identity = HString("identity")
  105. }
  106. object Qualifiers {
  107.     val title = HString("title")
  108.     val url = HString("url")
  109.     val media = HString("media")
  110.     val media_source = HString("media_source")
  111.     val content = HString("content")
  112.     val nolimitid_timestamp = HString("nolimitid.timestamp")
  113.     val original_id = HString("original_id")
  114.     val timestamp = HString("timestamp")
  115.     val date_created = HString("date_created")
  116.     val count = HString("count")
  117. }
  118. object Tables {
  119.     val rawstream100 = HString("raw_stream_1.0.0")
  120.     val rawstream = HString("rawCopy")
  121. }
  122.  
  123. class tmapper extends TableMapper[Text, Put]{
  124.   def map (row: ImmutableBytesWritable, value: Result, context: Context) {
  125.     val put = new Put(row.get())
  126.     val key = Bytes.toString(row.get());
  127.     for (kv <- value.raw()) {
  128.         put.add(kv)
  129.     }
  130.     context.write(new Text(key), put)
  131.   }
  132. }
  133.  
  134. class treducer extends TableReducer[Text, Put, Put]{
  135.     def reduce (row: Text, values: Put, context:Context){
  136.         context.write(values, values);
  137.     }
  138. }
  139.  
  140. object Hello {
  141.   val hbaseMaster = "127.0.0.1:60000"
  142.   val hbaseZookeper = "127.0.0.1"
  143.   def main(args: Array[String]): Unit = {
  144.     val conf = HBaseConfiguration.create()
  145.     conf.set("hbase.master", hbaseMaster)
  146.     conf.set("hbase.zookeeper.quorum", hbaseZookeper)
  147.     val hbaseAdmin = new HBaseAdmin(conf)
  148.     val scan = new Scan()
  149.     scan.setCaching(500)         // 1 is the default in Scan, which will be bad for MapReduce jobs
  150.     scan.setCacheBlocks(false)   // don't set to true for MR jobs
  151.    
  152.     val job = new Job(conf)
  153.     job.setJobName("CopyTable");
  154.     job.setJarByClass(classOf[Hello])
  155.    
  156.     TableMapReduceUtil.initTableMapperJob(
  157.       Tables.rawstream100.name,     // input HBase table name
  158.       scan,                      // Scan instance to control CF and attribute selection
  159.       classOf[tmapper],  // mapper class
  160.       classOf[ImmutableBytesWritable],             // mapper output key class
  161.       classOf[Result],     // mapper output value class
  162.       job
  163.     )
  164.     job.setMapperClass(classOf[tmapper])
  165.     job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
  166.     job.setMapOutputValueClass(classOf[Result])
  167.    
  168.     TableMapReduceUtil.initTableReducerJob(
  169.       Tables.rawstream.name,          // Table name
  170.       classOf[treducer], // Reducer class
  171.       job
  172.     )
  173.     job.setReducerClass(classOf[treducer]);
  174.     job.setNumReduceTasks(1);
  175.     job.setOutputFormatClass(classOf[TableOutputFormat[Unit]]);
  176.     job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, Tables.rawstream);
  177.  
  178.     val b = job.waitForCompletion(true);
  179.     if (!b) {
  180.         throw new IOException("error within job!");
  181.     }
  182.   }
  183. }
  184.  
  185. class Hello {}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement