Advertisement
bradrubin

Writing a Simple Hive Serde

May 23rd, 2012
1,165
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.33 KB | None | 0 0
  1. Here is an example of how to have a Hadoop MapReduce job produce a complex type as the value field that is written out in a SequenceFile (binary format), and that can be treated as an external Hive table.  It consists of three parts: A custom writable, a custom Serde, and the Hive table creation.  In calling the Hive file with the hive command, you must also use this option: --auxpath $NH_HOME/lib/NeuroHadoop.jar .  Also, MapReduce keys will not show up in the Hive table, so write the MapReduce key value as NullWritable.  This code is compatible with compression as well.  Also, the MapReduce output can displayed directly with: hadoop fs -libjars $NH_HOME/lib/NeuroHadoop.jar -text /pathToHDFSFile.
  2.  
  3. RatWritable.java
  4.  
  5. package neurohadoop;
  6.  
  7. import java.io.DataOutput;
  8. import java.io.DataInput;
  9. import java.io.IOException;
  10.  
  11. import org.apache.hadoop.io.Writable;
  12.  
  13. public class RatWritable implements Writable {
  14.     int time;
  15.     short frequency;
  16.     float convolution;
  17.  
  18.     public void readFields(DataInput in) throws IOException {
  19.         time = in.readInt();
  20.         frequency = in.readShort();
  21.         convolution = in.readFloat();
  22.     }
  23.  
  24.     public void write(DataOutput out) throws IOException {
  25.         out.writeInt(time);
  26.         out.writeShort(frequency);
  27.         out.writeFloat(convolution);
  28.     }
  29.  
  30.     @Override
  31.     public String toString() {
  32.         return time + "\t" + frequency + "\t" + convolution;
  33.     }
  34. }
  35.  
  36. RatSerde.java
  37.  
  38. package neurohadoop;
  39.  
  40. import java.util.Properties;
  41. import java.util.ArrayList;
  42. import java.util.Arrays;
  43. import java.util.List;
  44.  
  45. //import org.apache.commons.logging.Log;
  46. //import org.apache.commons.logging.LogFactory;
  47.  
  48. import org.apache.hadoop.conf.Configuration;
  49. import org.apache.hadoop.hive.serde2.SerDe;
  50. import org.apache.hadoop.hive.serde.Constants;
  51. import org.apache.hadoop.hive.serde2.SerDeException;
  52. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  53. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
  54. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
  55. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  56. import org.apache.hadoop.io.IntWritable;
  57. import org.apache.hadoop.io.FloatWritable;
  58. import org.apache.hadoop.hive.serde2.io.ShortWritable;
  59.  
  60. import org.apache.hadoop.io.Writable;
  61.  
  62. public class RatSerde implements SerDe {
  63.  
  64.     //private static final Log LOG = LogFactory.getLog(RatSerde.class.getName());
  65.     StructObjectInspector rowOI;
  66.     ArrayList<Object> row;
  67.  
  68.     List<String> columnNames;
  69.     List<ObjectInspector> columnOIs;
  70.     IntWritable i = new IntWritable();
  71.     ShortWritable s = new ShortWritable();
  72.     FloatWritable f = new FloatWritable();
  73.  
  74.     @Override
  75.     public void initialize(Configuration conf, Properties props)
  76.             throws SerDeException {
  77.         //LOG.debug("Initializing RatSerde");
  78.         columnOIs = new ArrayList<ObjectInspector>(3);
  79.         row = new ArrayList<Object>(3);
  80.         row.add(null);
  81.         row.add(null);
  82.         row.add(null);
  83.         columnOIs
  84.                 .add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
  85.         columnOIs
  86.                 .add(PrimitiveObjectInspectorFactory.writableShortObjectInspector);
  87.         columnOIs
  88.                 .add(PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
  89.         String columnNameProperty = props.getProperty(Constants.LIST_COLUMNS);
  90.         columnNames = Arrays.asList(columnNameProperty.split(","));
  91.         rowOI = ObjectInspectorFactory.getStandardStructObjectInspector(
  92.                 columnNames, columnOIs);
  93.     }
  94.  
  95.     @Override
  96.     public ObjectInspector getObjectInspector() throws SerDeException {
  97.         return rowOI;
  98.     }
  99.  
  100.     @Override
  101.     public Object deserialize(Writable blob) throws SerDeException {
  102.  
  103.         RatWritable rw = (RatWritable) blob;
  104.         //LOG.debug("Deserialize row: " + rw.toString());
  105.  
  106.         i.set(rw.time);
  107.         s.set(rw.frequency);
  108.         f.set(rw.convolution);
  109.        
  110.         row.set(0, i);
  111.         row.set(1, s);
  112.         row.set(2, f);
  113.  
  114.         return row;
  115.     }
  116.  
  117.     @Override
  118.     public Class<? extends Writable> getSerializedClass() {
  119.         return null;
  120.         // serialization not supported
  121.     }
  122.  
  123.     @Override
  124.     public Writable serialize(Object arg0, ObjectInspector arg1)
  125.             throws SerDeException {
  126.         return null;
  127.         // serialization not supported
  128.     }
  129.  
  130. }
  131.  
  132. Hive:
  133.  
  134. ADD JAR ${hiveconf:nhhome}/lib/NeuroHadoop.jar;
  135.  
  136. CREATE EXTERNAL TABLE rats(time INT, frequency SMALLINT, convolution FLOAT)
  137. ROW FORMAT SERDE 'neurohadoop.RatSerde'
  138. STORED AS SEQUENCEFILE LOCATION '/neuro/output/rats';
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement