Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Here is an example of how to have a Hadoop MapReduce job produce a complex type as the value field that is written out in a SequenceFile (binary format), and that can be treated as an external Hive table. It consists of three parts: A custom writable, a custom Serde, and the Hive table creation. In calling the Hive file with the hive command, you must also use this option: --auxpath $NH_HOME/lib/NeuroHadoop.jar . Also, MapReduce keys will not show up in the Hive table, so write the MapReduce key value as NullWritable. This code is compatible with compression as well. Also, the MapReduce output can displayed directly with: hadoop fs -libjars $NH_HOME/lib/NeuroHadoop.jar -text /pathToHDFSFile.
- RatWritable.java
- package neurohadoop;
- import java.io.DataOutput;
- import java.io.DataInput;
- import java.io.IOException;
- import org.apache.hadoop.io.Writable;
- public class RatWritable implements Writable {
- int time;
- short frequency;
- float convolution;
- public void readFields(DataInput in) throws IOException {
- time = in.readInt();
- frequency = in.readShort();
- convolution = in.readFloat();
- }
- public void write(DataOutput out) throws IOException {
- out.writeInt(time);
- out.writeShort(frequency);
- out.writeFloat(convolution);
- }
- @Override
- public String toString() {
- return time + "\t" + frequency + "\t" + convolution;
- }
- }
- RatSerde.java
- package neurohadoop;
- import java.util.Properties;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.List;
- //import org.apache.commons.logging.Log;
- //import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hive.serde2.SerDe;
- import org.apache.hadoop.hive.serde.Constants;
- import org.apache.hadoop.hive.serde2.SerDeException;
- import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
- import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
- import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
- import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.FloatWritable;
- import org.apache.hadoop.hive.serde2.io.ShortWritable;
- import org.apache.hadoop.io.Writable;
- public class RatSerde implements SerDe {
- //private static final Log LOG = LogFactory.getLog(RatSerde.class.getName());
- StructObjectInspector rowOI;
- ArrayList<Object> row;
- List<String> columnNames;
- List<ObjectInspector> columnOIs;
- IntWritable i = new IntWritable();
- ShortWritable s = new ShortWritable();
- FloatWritable f = new FloatWritable();
- @Override
- public void initialize(Configuration conf, Properties props)
- throws SerDeException {
- //LOG.debug("Initializing RatSerde");
- columnOIs = new ArrayList<ObjectInspector>(3);
- row = new ArrayList<Object>(3);
- row.add(null);
- row.add(null);
- row.add(null);
- columnOIs
- .add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
- columnOIs
- .add(PrimitiveObjectInspectorFactory.writableShortObjectInspector);
- columnOIs
- .add(PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
- String columnNameProperty = props.getProperty(Constants.LIST_COLUMNS);
- columnNames = Arrays.asList(columnNameProperty.split(","));
- rowOI = ObjectInspectorFactory.getStandardStructObjectInspector(
- columnNames, columnOIs);
- }
- @Override
- public ObjectInspector getObjectInspector() throws SerDeException {
- return rowOI;
- }
- @Override
- public Object deserialize(Writable blob) throws SerDeException {
- RatWritable rw = (RatWritable) blob;
- //LOG.debug("Deserialize row: " + rw.toString());
- i.set(rw.time);
- s.set(rw.frequency);
- f.set(rw.convolution);
- row.set(0, i);
- row.set(1, s);
- row.set(2, f);
- return row;
- }
- @Override
- public Class<? extends Writable> getSerializedClass() {
- return null;
- // serialization not supported
- }
- @Override
- public Writable serialize(Object arg0, ObjectInspector arg1)
- throws SerDeException {
- return null;
- // serialization not supported
- }
- }
- Hive:
- ADD JAR ${hiveconf:nhhome}/lib/NeuroHadoop.jar;
- CREATE EXTERNAL TABLE rats(time INT, frequency SMALLINT, convolution FLOAT)
- ROW FORMAT SERDE 'neurohadoop.RatSerde'
- STORED AS SEQUENCEFILE LOCATION '/neuro/output/rats';
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement