Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package reader;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
- import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
- import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
- import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.util.GenericOptionsParser;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.apache.orc.mapred.OrcStruct;
- import org.apache.orc.mapreduce.OrcInputFormat;
- import org.apache.orc.mapreduce.OrcOutputFormat;
- import java.io.IOException;
- public class Main extends Configured implements Tool
- {
- public static class MyMapper extends Mapper<NullWritable, OrcStruct, LongWritable, Text>
- {
- private LongWritable one = new LongWritable(1);
- @Override
- protected void map(NullWritable key, OrcStruct value, Context context)
- throws IOException, InterruptedException
- {
- String temp = value.getFieldValue(6).toString();
- if(temp.equalsIgnoreCase("0111000000"))
- {
- Text word = new Text();
- word.set(temp);
- context.write(one, word);
- }
- }
- }
- public static class MyReducer extends Reducer<LongWritable, Text, LongWritable, IntWritable>
- {
- @SuppressWarnings("unused")
- public void reduce(LongWritable key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException
- {
- int counter = 0;
- for(Text txt:values) counter++;
- context.write(key, new IntWritable(counter));
- }
- }
- @Override
- public int run(String[] args) throws Exception
- {
- Configuration conf = getConf();
- Job job = Job.getInstance(conf);
- Builder builder = SearchArgumentFactory.newBuilder();
- String[]columns = {"lenb","locdate","lochour","normdate",
- "normhour","caller","called","duration","hash","dates"};
- SearchArgument sarg =
- builder.startAnd().equals("called",PredicateLeaf.Type.STRING,
- "0111000000").end().build();
- OrcInputFormat.setSearchArgument(conf, sarg, columns);
- args = new GenericOptionsParser(conf, args).getRemainingArgs();
- job.setJarByClass(Main.class);
- job.setMapperClass(MyMapper.class);
- job.setReducerClass(MyReducer.class);
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(Text.class);
- job.setOutputKeyClass(LongWritable.class);
- job.setOutputValueClass(IntWritable.class);
- OrcInputFormat.addInputPath(job, new Path(args[0]));
- OrcOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setInputFormatClass(OrcInputFormat.class);
- return job.waitForCompletion(true)? 0:1;
- }
- public static void main(String[]args) throws Exception
- {
- long time1 = System.currentTimeMillis();
- int exitCode = ToolRunner.run(new Main(), args);
- long time2 = System.currentTimeMillis();
- System.out.println("Total time = "+ (time2 - time1)/1000);
- System.exit(exitCode);
- }
- }
Add Comment
Please, Sign In to add comment