import java.util.*; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; import org.apache.hadoop.io.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; public class linecount extends Configured implements Tool { //define mapper class public static class maplc extends MapReduceBase implements Mapper { IntWritable One= new IntWritable(1); Text Word =new Text( "Total No. of Lines"); //define map method public void map (LongWritable key, Text value,OutputCollector output,Reporter reporter) throws IOException { output.collect (Word,One); } } //define reducer class public static class reducelc extends MapReduceBase implements Reducer { //define reduce class public void reduce (Text key, Iterator values,OutputCollector output,Reporter reporter)throws IOException { int Sum=0; while (values.hasNext()) { Sum+=values.next().get(); } output.collect(key,new IntWritable(Sum)); } } @Override public int run (String[] args )throws Exception { JobConf conf = new JobConf(getConf()); conf.setJarByClass(linecount.class); conf.setJobName("linecount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(maplc.class); conf.setReducerClass(reducelc.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); conf.set("mapred.job.queue.name", "hdmi-paypal"); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); return 1; } public static void main (String[] args ) throws Exception { int exitCode =ToolRunner.run(new linecount(),args); System.exit(exitCode); } }