Advertisement
Guest User

ImportData for HBase Bulk Load

a guest
Mar 8th, 2012
195
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 15.60 KB | None | 0 0
  1. package com.intuit.ihub.hbase.poc;
  2.  
  3. /*     */
  4. /*     */
  5. /*     */ import com.google.common.base.Function;
  6. /*     */ import com.google.common.base.Preconditions;
  7. /*     */ import com.google.common.base.Splitter;
  8. /*     */ import com.google.common.collect.Lists;
  9. /*     */ import java.io.IOException;
  10. /*     */ import java.io.PrintStream;
  11. /*     */ import java.util.ArrayList;
  12. /*     */ import org.apache.hadoop.conf.Configuration;
  13. /*     */ import org.apache.hadoop.fs.Path;
  14. /*     */ import org.apache.hadoop.hbase.HBaseConfiguration;
  15. /*     */ import org.apache.hadoop.hbase.HConstants;
  16. /*     */ import org.apache.hadoop.hbase.KeyValue;
  17. /*     */ import org.apache.hadoop.hbase.KeyValue.Type;
  18. /*     */ import org.apache.hadoop.hbase.client.HTable;
  19. /*     */ import org.apache.hadoop.hbase.client.Put;
  20. /*     */ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  21. import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
  22. import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
  23. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  24. /*     */ import org.apache.hadoop.hbase.util.Base64;
  25. /*     */ import org.apache.hadoop.hbase.util.Bytes;
  26. /*     */ import org.apache.hadoop.io.LongWritable;
  27. /*     */ import org.apache.hadoop.io.Text;
  28. /*     */ import org.apache.hadoop.mapreduce.Counter;
  29. /*     */ import org.apache.hadoop.mapreduce.Job;
  30. /*     */ import org.apache.hadoop.mapreduce.Mapper;
  31. /*     */ import org.apache.hadoop.mapreduce.Mapper.Context;
  32. /*     */ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  33. /*     */ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  34. /*     */ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  35. import org.apache.hadoop.util.GenericOptionsParser;
  36. /*     */
  37. /*     */ public class ImportData
  38. /*     */ {
  39. /*     */   static final String NAME = "importtsv";
  40. /*     */   static final String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
  41. /*     */   static final String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
  42. /*     */   static final String COLUMNS_CONF_KEY = "importtsv.columns";
  43. /*     */   static final String SEPARATOR_CONF_KEY = "importtsv.separator";
  44. /*     */   static final String DEFAULT_SEPARATOR = "\t";
  45. /*     */
  46. /*     */   public static Job createSubmittableJob(Configuration conf, String[] args)
  47. /*     */     throws IOException
  48. /*     */   {      
  49. /* 289 */     String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
  50. /* 290 */     if (actualSeparator != null) {
  51. /* 291 */       conf.set(SEPARATOR_CONF_KEY, new String(Base64.encodeBytes(actualSeparator.getBytes())));
  52. /*     */     }
  53. /*     */
  54. /* 295 */     String tableName = args[0];
  55. /* 296 */     Path inputDir = new Path(args[1]);
  56. /* 297 */     Job job = new Job(conf, "importtsv_" + tableName);
  57. /* 298 */     job.setJarByClass(TsvImporter.class);
  58. /* 299 */     FileInputFormat.setInputPaths(job, new Path[] { inputDir });
  59. /* 300 */     job.setInputFormatClass(TextInputFormat.class);
  60. /* 301 */     job.setMapperClass(TsvImporter.class);
  61. /*     */
  62. /* 303 */     String hfileOutPath = conf.get("importtsv.bulk.output");
  63. /* 304 */     if (hfileOutPath != null) {
  64. /* 305 */       HTable table = new HTable(conf, tableName);
  65. /* 306 */       job.setReducerClass(PutSortReducer.class);
  66. /* 307 */       Path outputDir = new Path(hfileOutPath);
  67. /* 308 */       FileOutputFormat.setOutputPath(job, outputDir);
  68. /* 309 */       job.setMapOutputKeyClass(ImmutableBytesWritable.class);
  69. /* 310 */       job.setMapOutputValueClass(Put.class);
  70. /* 311 */       HFileOutputFormat.configureIncrementalLoad(job, table);
  71. /*     */     }
  72. /*     */     else
  73. /*     */     {
  74. /* 315 */       TableMapReduceUtil.initTableReducerJob(tableName, null, job);
  75. /* 316 */       job.setNumReduceTasks(0);
  76. /*     */     }
  77. /*     */
  78. /* 319 */     TableMapReduceUtil.addDependencyJars(job);
  79. /* 320 */     TableMapReduceUtil.addDependencyJars(job.getConfiguration(), new Class[] { Function.class });
  80. /*     */
  81. /* 322 */     return job;
  82. /*     */   }
  83. /*     */
  84. /*     */   private static void usage(String errorMsg)
  85. /*     */   {
  86. /* 329 */     if ((errorMsg != null) && (errorMsg.length() > 0)) {
  87. /* 330 */       System.err.println("ERROR: " + errorMsg);
  88. /*     */     }
  89. /* 332 */     String usage = "Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>\n\nImports the given input directory of TSV data into the specified table.\n\nThe column names of the TSV data must be specified using the -Dimporttsv.columns\noption. This option takes the form of comma-separated column names, where each\ncolumn name is either a simple column family, or a columnfamily:qualifier. The special\ncolumn name HBASE_ROW_KEY is used to designate that this column should be used\nas the row key for each imported record. You must specify exactly one column\nto be the row key, and you must specify a column name for every column that exists in the\ninput data.\n\nIn order to prepare data for a bulk data load, pass the option:\n  -Dimporttsv.bulk.output=/path/for/output\n  Note: if you do not use this option, then the target table must already exist in HBase\n\nOther options that may be specified with -D include:\n  -Dimporttsv.skip.bad.lines=false - fail if encountering an invalid line\n  '-Dimporttsv.separator=|' - eg separate on pipes instead of tabs";
  90. /*     */
  91. /* 352 */     System.err.println(usage);
  92. /*     */   }
  93. /*     */
  94. /*     */   public static void main(String[] args)
  95. /*     */     throws Exception
  96. /*     */   {
  97.                 for(String str: args)
  98.                 {
  99.                     System.out.println("Command line Arguments::" + str);
  100.                 }
  101. /* 362 */     Configuration conf = HBaseConfiguration.create();
  102. /* 363 */     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  103.  
  104.                 for(String str: otherArgs)
  105.                 {
  106.                     System.out.println("OtherArguments==>" + str);
  107.                 }
  108. /* 364 */     if (otherArgs.length < 2) {
  109. /* 365 */       usage("Wrong number of arguments: " + otherArgs.length);
  110. /* 366 */       System.exit(-1);
  111. /*     */     }
  112. /*     */
  113. /* 370 */     String[] columns = conf.getStrings("importtsv.columns");
  114. /* 371 */     if (columns == null) {
  115. /* 372 */       usage("No columns specified. Please specify with -Dimporttsv.columns=...");
  116. /*     */
  117. /* 374 */       System.exit(-1);
  118. /*     */     }
  119. /*     */
  120. /* 378 */     int rowkeysFound = 0;
  121. /* 379 */     for (String col : columns) {
  122. /* 380 */       if (!(col.equals(TsvParser.ROWKEY_COLUMN_SPEC))) continue; ++rowkeysFound;
  123. /*     */     }
  124. /* 382 */     if (rowkeysFound != 1) {
  125. /* 383 */       usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
  126. /* 384 */       System.exit(-1);
  127. /*     */     }
  128. /*     */
  129. /* 388 */     if (columns.length < 2) {
  130. /* 389 */       usage("One or more columns in addition to the row key are required");
  131. /* 390 */       System.exit(-1);
  132. /*     */     }
  133. /*     */
  134.                 //check whether the separator has been read properly or not
  135.                 String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
  136.                 System.out.println("SEPARATOR as per jobconf:" + actualSeparator);
  137.                
  138. /* 393 */     Job job = createSubmittableJob(conf, otherArgs);
  139. /* 394 */     System.exit((job.waitForCompletion(true)) ? 0 : 1);
  140. /*     */   }
  141. /*     */
  142. /*     */   static class TsvImporter extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
  143. /*     */   {
  144. /*     */     private long ts;
  145. /*     */     private boolean skipBadLines;
  146. /*     */     private Counter badLineCount;
  147. /*     */     private ImportData.TsvParser parser;
  148. /*     */
  149. /*     */     protected void setup(Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
  150. /*     */     {
  151. /* 207 */       Configuration conf = context.getConfiguration();
  152. /*     */
  153. /* 211 */       String separator = conf.get(SEPARATOR_CONF_KEY);
  154.                 System.out.println("SEPARATOR IS:" + separator);
  155.                 System.err.println("SEPARATOR IS:" + separator);
  156. /* 212 */       if (separator == null)
  157. /* 213 */         separator = "\t";
  158. /*     */       else {
  159. /* 215 */         separator = new String(Base64.decode(separator));
  160. /*     */       }
  161. /*     */
  162. /* 218 */       this.parser = new ImportData.TsvParser(conf.get("importtsv.columns"), separator);
  163. /*     */
  164. /* 220 */       if (this.parser.getRowKeyColumnIndex() == -1) {
  165. /* 221 */         throw new RuntimeException("No row key column specified");
  166. /*     */       }
  167. /* 223 */       this.ts = System.currentTimeMillis();
  168. /*     */
  169. /* 225 */       this.skipBadLines = context.getConfiguration().getBoolean("importtsv.skip.bad.lines", true);
  170. /*     */
  171. /* 227 */       this.badLineCount = context.getCounter("ImportData", "Bad Lines");
  172. /*     */     }
  173. /*     */
  174. /*     */     public void map(LongWritable offset, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
  175. /*     */       throws IOException
  176. /*     */     {
  177. /* 237 */       byte[] lineBytes = value.getBytes();
  178. /*     */       try
  179. /*     */       {
  180. /* 240 */         ImportData.TsvParser.ParsedLine parsed = this.parser.parse(lineBytes, value.getLength());
  181. /*     */
  182. /* 242 */         ImmutableBytesWritable rowKey = new ImmutableBytesWritable(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength());
  183. /*     */
  184. /* 247 */         Put put = new Put(rowKey.copyBytes());
  185. /* 248 */         for (int i = 0; i < parsed.getColumnCount(); ++i)
  186. /* 249 */           if (i != this.parser.getRowKeyColumnIndex()) {
  187. /* 250 */             KeyValue kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), this.parser.getFamily(i), 0, this.parser.getFamily(i).length, this.parser.getQualifier(i), 0, this.parser.getQualifier(i).length, this.ts, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
  188. /*     */
  189. /* 257 */             put.add(kv);
  190. /*     */           }
  191. /* 259 */         context.write(rowKey, put);
  192. /*     */       } catch (ImportData.TsvParser.BadTsvLineException badLine) {
  193. /* 261 */         if (this.skipBadLines) {
  194. /* 262 */           System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
  195.                     System.out.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
  196. /*     */           badLine.printStackTrace();
  197. /* 265 */           this.badLineCount.increment(1L);
  198. /* 266 */           return;
  199. /*     */         }
  200. /* 268 */         throw new IOException(badLine);
  201. /*     */       }
  202. /*     */       catch (InterruptedException e) {
  203. /* 271 */         e.printStackTrace();
  204. /*     */       }
  205. /*     */     }
  206. /*     */   }
  207. /*     */
  208. /*     */   static class TsvParser
  209. /*     */   {
  210. /*     */     private final byte[][] families;
  211. /*     */     private final byte[][] qualifiers;
  212. /*     */     private final byte separatorByte;
  213. /*     */     private int rowKeyColumnIndex;
  214. /*  79 */     public static String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
  215. /*     */
  216. /*     */     public TsvParser(String columnsSpecification, String separatorStr)
  217. /*     */     {
  218. /*  87 */       byte[] separator = Bytes.toBytes(separatorStr);
  219. /*  88 */       Preconditions.checkArgument(separator.length == 1, "TsvParser only supports single-byte separators");
  220. /*     */
  221. /*  90 */       this.separatorByte = separator[0];
  222. /*     */
  223. /*  93 */       ArrayList columnStrings = Lists.newArrayList(Splitter.on(',').trimResults().split(columnsSpecification));
  224. /*     */
  225. /*  96 */       this.families = new byte[columnStrings.size()][];
  226. /*  97 */       this.qualifiers = new byte[columnStrings.size()][];
  227. /*     */
  228. /*  99 */       for (int i = 0; i < columnStrings.size(); ++i) {
  229. /* 100 */         String str = (String)columnStrings.get(i);
  230. /* 101 */         if (ROWKEY_COLUMN_SPEC.equals(str)) {
  231. /* 102 */           this.rowKeyColumnIndex = i;
  232. /*     */         }
  233. /*     */         else {
  234. /* 105 */           String[] parts = str.split(":", 2);
  235. /* 106 */           if (parts.length == 1) {
  236. /* 107 */             this.families[i] = str.getBytes();
  237. /* 108 */             this.qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
  238. /*     */           } else {
  239. /* 110 */             this.families[i] = parts[0].getBytes();
  240. /* 111 */             this.qualifiers[i] = parts[1].getBytes(); }
  241. /*     */         }
  242. /*     */       }
  243. /*     */     }
  244. /*     */
  245. /*     */     public int getRowKeyColumnIndex() {
  246. /* 117 */       return this.rowKeyColumnIndex; }
  247. /*     */
  248. /*     */     public byte[] getFamily(int idx) {
  249. /* 120 */       return this.families[idx]; }
  250. /*     */
  251. /*     */     public byte[] getQualifier(int idx) {
  252. /* 123 */       return this.qualifiers[idx];
  253. /*     */     }
  254. /*     */
  255. /*     */     public ParsedLine parse(byte[] lineBytes, int length)
  256. /*     */       throws ImportData.TsvParser.BadTsvLineException
  257. /*     */     {
  258. /* 129 */       ArrayList tabOffsets = new ArrayList(this.families.length);
  259.                 System.out.println("Line Bytes:" + Bytes.toString(lineBytes));
  260. /* 130 */       for (int i = 0; i < length; ++i) {
  261. /* 131 */         if (lineBytes[i] == this.separatorByte) {
  262. /* 132 */           tabOffsets.add(Integer.valueOf(i));
  263. /*     */         }
  264. /*     */       }
  265. /* 135 */       if (tabOffsets.isEmpty()) {
  266.                     System.err.println("Tab Offset:" +tabOffsets.toArray());
  267.                     System.err.println("Line Bytes:" + Bytes.toString(lineBytes));
  268. /* 136 */         throw new ImportData.TsvParser.BadTsvLineException("No delimiter");
  269. /*     */       }
  270. /*     */
  271. /* 139 */       tabOffsets.add(Integer.valueOf(length));
  272. /*     */
  273. /* 141 */       if (tabOffsets.size() > this.families.length)
  274. /* 142 */         throw new ImportData.TsvParser.BadTsvLineException("Excessive columns");
  275. /* 143 */       if (tabOffsets.size() <= getRowKeyColumnIndex()) {
  276. /* 144 */         throw new ImportData.TsvParser.BadTsvLineException("No row key");
  277. /*     */       }
  278. /* 146 */       return new ParsedLine(tabOffsets, lineBytes);
  279. /*     */     }
  280. /*     */
  281. /*     */     public static class BadTsvLineException extends Exception
  282. /*     */     {
  283. /*     */       private static final long serialVersionUID = 1L;
  284. /*     */
  285. /*     */       public BadTsvLineException(String err)
  286. /*     */       {
  287. /* 183 */         super(err);
  288. /*     */       }
  289. /*     */     }
  290. /*     */
  291. /*     */     class ParsedLine
  292. /*     */     {
  293. /*     */       private final ArrayList<Integer> tabOffsets;
  294. /*     */       private byte[] lineBytes;
  295. /*     */
  296. /*     */       ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes )
  297. /*     */       {
  298. /* 154 */         this.tabOffsets = tabOffsets;
  299. /* 155 */         this.lineBytes = lineBytes;
  300. /*     */       }
  301. /*     */
  302. /*     */       public int getRowKeyOffset() {
  303. /* 159 */         return getColumnOffset(ImportData.TsvParser.this.rowKeyColumnIndex); }
  304. /*     */
  305. /*     */       public int getRowKeyLength() {
  306. /* 162 */         return getColumnLength(ImportData.TsvParser.this.rowKeyColumnIndex); }
  307. /*     */
  308. /*     */       public int getColumnOffset(int idx) {
  309. /* 165 */         if (idx > 0) {
  310. /* 166 */           return (((Integer)this.tabOffsets.get(idx - 1)).intValue() + 1);
  311. /*     */         }
  312. /* 168 */         return 0; }
  313. /*     */
  314. /*     */       public int getColumnLength(int idx) {
  315. /* 171 */         return (((Integer)this.tabOffsets.get(idx)).intValue() - getColumnOffset(idx)); }
  316. /*     */
  317. /*     */       public int getColumnCount() {
  318. /* 174 */         return this.tabOffsets.size(); }
  319. /*     */
  320. /*     */       public byte[] getLineBytes() {
  321. /* 177 */         return this.lineBytes;
  322. /*     */       }
  323. /*     */     }
  324. /*     */   }
  325. /*     */ }
  326.  
  327. /* Location:           C:\Users\agupta5\Documents\hadoop_stuff\hbase\hbase-0.90.4-cdh3u2.jar
  328.  * Qualified Name:     org.apache.hadoop.hbase.mapreduce.ImportData
  329.  * Java Class Version: 6 (50.0)
  330.  * JD-Core Version:    0.5.3
  331.  */
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement