import java.util.*;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
public class linecount extends Configured implements Tool
{
//define mapper class
public static class maplc extends MapReduceBase implements Mapper <LongWritable, Text, Text,IntWritable>
{
IntWritable One= new IntWritable(1);
Text Word =new Text( "Total No. of Lines");
//define map method
public void map (LongWritable key, Text value,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException
{
output.collect (Word,One);
}
}
//define reducer class
public static class reducelc extends MapReduceBase implements Reducer <Text,IntWritable, Text,IntWritable>
{
//define reduce class
public void reduce (Text key, Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,Reporter reporter)throws IOException
{
int Sum=0;
while (values.hasNext())
{
Sum+=values.next().get();
}
output.collect(key,new IntWritable(Sum));
}
}
@Override
public int run (String[] args )throws Exception {
JobConf conf = new JobConf(getConf());
conf.setJarByClass(linecount.class);
conf.setJobName("linecount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(maplc.class);
conf.setReducerClass(reducelc.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.set("mapred.job.queue.name", "hdmi-paypal");
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
return 1;
}
public static void main (String[] args ) throws Exception {
int exitCode =ToolRunner.run(new linecount(),args);
System.exit(exitCode);
}
}