Guest User

Untitled

a guest
Jul 17th, 2018
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.11 KB | None | 0 0
  1. import java.util.List;
  2. import java.util.LinkedList;
  3. import java.util.Set;
  4. import java.util.StringTokenizer;
  5. import java.util.Random;
  6. import java.util.Arrays;
  7. import java.util.stream.StreamSupport;
  8. import java.lang.Thread;
  9. import java.lang.Class;
  10. import java.io.IOException;
  11. import java.lang.InterruptedException;
  12.  
  13. import org.apache.hadoop.conf.Configuration;
  14. import org.apache.hadoop.fs.Path;
  15. import org.apache.hadoop.io.IntWritable;
  16. import org.apache.hadoop.io.Text;
  17. import org.apache.hadoop.mapreduce.Job;
  18. import org.apache.hadoop.mapreduce.Mapper;
  19. import org.apache.hadoop.mapreduce.Reducer;
  20.  
  21. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  22. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
  23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  24. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
  25. import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
  26.  
  27. import org.apache.hadoop.mapreduce.Counter;
  28. import org.apache.hadoop.util.GenericOptionsParser;
  29. import org.apache.hadoop.util.StringUtils;
  30. import org.apache.hadoop.mapred.JobConf;
  31. import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
  32. import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
  33. import org.apache.hadoop.fs.Path;
  34. import org.apache.hadoop.fs.FileSystem;
  35.  
  36. /* Calculate word frequency histogram
  37. * # work flow
  38. * (word, 1) -> (word, count) -> (count, 1) -> (count, time of count)
  39. */
  40.  
  41. public class WordCountHistogram {
  42. public static void main(String[] args) throws Exception {
  43. Configuration conf = new Configuration();
  44.  
  45. // if output directory exists, delete it
  46. Path outputPath = new Path(args[1]);
  47. FileSystem hdfs = FileSystem.get(conf);
  48. if (hdfs.exists(outputPath)) {
  49. hdfs.delete(outputPath, true);
  50. }
  51.  
  52. // generate intermediate path
  53. int k = -1;
  54. for (int i = args[1].length() - 1; i >= 0; i--) {
  55. if (args[1].charAt(i) == '/') {
  56. k = i;
  57. break;
  58. }
  59. }
  60.  
  61. Path intermediatePath;
  62. if (k > -1) intermediatePath = new Path(args[1].substring(0, k) + "/intermediate");
  63. else intermediatePath = new Path("./intermediate");
  64. if (hdfs.exists(intermediatePath)) {
  65. hdfs.delete(intermediatePath, true);
  66. }
  67.  
  68. //first job: original text -> (word, frequency)
  69.  
  70. Builder b1 = new Builder(conf, "word count");
  71. b1.setJarByClass(WordCountHistogram.class).setMapperClass(WordTokenizer.class).setCombinerClass(WordCounter.class)
  72. .setReducerClass(WordCounter.class).setOutputKeyClass(Text.class).setOutputValueClass(IntWritable.class)
  73. .setOutputFormatClass(SequenceFileOutputFormat.class);
  74. Job job1 = b1.build();
  75.  
  76. FileInputFormat.addInputPath(job1, new Path(args[0]));
  77. FileOutputFormat.setOutputPath(job1, intermediatePath);
  78. job1.waitForCompletion(true);
  79.  
  80. // second job: (word, frequency) -> (frequency, word) -> (frequency, how many times we have words with this frequency)
  81.  
  82. Builder b2 = new Builder(conf, "histogram counter");
  83. b2.setJarByClass(WordCountHistogram.class).setMapperClass(ReverseMapper.class).setCombinerClass(IntCounter.class)
  84. .setReducerClass(IntCounter.class)
  85. .setMapOutputKeyClass(IntWritable.class).setMapOutputValueClass(Text.class)
  86. .setOutputKeyClass(IntWritable.class).setOutputValueClass(IntWritable.class)
  87. .setInputFormatClass(SequenceFileInputFormat.class);
  88. Job job2 = b2.build();
  89.  
  90. FileInputFormat.addInputPath(job2, intermediatePath);
  91. FileOutputFormat.setOutputPath(job2, new Path(args[1]));
  92. job2.waitForCompletion(true);
  93.  
  94. // delete intermediate files
  95.  
  96. if (hdfs.exists(intermediatePath)) {
  97. hdfs.delete(intermediatePath, true);
  98. }
  99. }
  100.  
  101. /* Job Builder */
  102. public static class Builder {
  103. Job job;
  104. Builder(Configuration conf, String name) throws IOException {
  105. job = Job.getInstance(conf, name);
  106. }
  107.  
  108. Builder setJarByClass(Class c) {
  109. job.setJarByClass(c);
  110. return this;
  111. }
  112.  
  113. Builder setCombinerClass(Class c) {
  114. job.setCombinerClass(c);
  115. return this;
  116. }
  117.  
  118. Builder setMapperClass(Class c) {
  119. job.setMapperClass(c);
  120. return this;
  121. }
  122.  
  123. Builder setReducerClass(Class c) {
  124. job.setReducerClass(c);
  125. return this;
  126. }
  127.  
  128. /* setOutputKeyClass & setOutputValueClass will set both Mapper and Reducer's output format
  129. * if their output type doesn't match, this would lead to an error.
  130. */
  131. Builder setOutputKeyClass(Class c) {
  132. job.setOutputKeyClass(c);
  133. return this;
  134. }
  135.  
  136. Builder setOutputValueClass(Class c) {
  137. job.setOutputValueClass(c);
  138. return this;
  139. }
  140.  
  141. /* setMapOutputKeyClass & setMapOutputValueClass only set Mapper's output format, when
  142. * Mapper's output is different from that of Reducer, we should also set with these utilities.
  143. */
  144. Builder setMapOutputKeyClass(Class c) {
  145. job.setMapOutputKeyClass(c);
  146. return this;
  147. }
  148.  
  149. Builder setMapOutputValueClass(Class c) {
  150. job.setMapOutputValueClass(c);
  151. return this;
  152. }
  153.  
  154. Builder setInputFormatClass(Class c) {
  155. job.setInputFormatClass(c);
  156. return this;
  157. }
  158.  
  159. Builder setOutputFormatClass(Class c) {
  160. job.setOutputFormatClass(c);
  161. return this;
  162. }
  163.  
  164. Job build() {
  165. return job;
  166. }
  167. }
  168.  
  169. /* Map: original text -> (word, 1) */
  170. public static class WordTokenizer extends Mapper<Object, Text, Text, IntWritable> {
  171. final IntWritable uno = new IntWritable(1);
  172. final Text texto = new Text();
  173.  
  174. @Override
  175. public void map(Object key, Text value, Context context) {
  176. String[] tokens = value.toString().split("[^a-zA-Z0-9]");
  177. Arrays.stream(tokens).forEach(token -> {
  178. texto.set(token.toLowerCase());
  179. try {
  180. context.write(texto, uno);
  181. }
  182. catch (IOException | InterruptedException e) {
  183. System.out.println("Error ocurrs @ WordTokenizer, yet I do not wanna do anything :D");
  184. }
  185. });
  186. }
  187. }
  188.  
  189. /* Reduce: (word, 1) -> (word, word frequency) */
  190. public static class WordCounter extends Reducer<Text, IntWritable, Text, IntWritable> {
  191. final IntWritable cnt = new IntWritable(0);
  192.  
  193. @Override
  194. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  195. int sum = StreamSupport.stream(values.spliterator(), false).map(x -> x.get()).reduce(0, (x, y) -> x + y);
  196. cnt.set(sum);
  197. context.write(key, cnt);
  198. }
  199. }
  200.  
  201. /* Mapper: reverse key-value pair
  202. * (word, frequency) -> (frequency, word)
  203. */
  204. static class ReverseMapper extends Mapper<Text, IntWritable, IntWritable, Text> {
  205. @Override
  206. public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
  207. //int num = Integer.parseInt(value.toString());
  208. context.write(value, key);
  209. }
  210. }
  211.  
  212. /* Reducer: count word-frequency histogram
  213. * (frequency, word) -> (frequency, # of times this frequency occurs)
  214. */
  215. static class IntCounter extends Reducer<IntWritable, Text, IntWritable, IntWritable> {
  216. final IntWritable cnt = new IntWritable(0);
  217.  
  218. @Override
  219. public void reduce(IntWritable key, Iterable<Text> values, Context context) {
  220. int sum = StreamSupport.stream(values.spliterator(), false).map(x -> 1).reduce(0, (x, y) -> x + y);
  221. try {
  222. cnt.set(sum);
  223. context.write(key, cnt);
  224. }
  225. catch (IOException | InterruptedException e) {
  226. System.out.println("Error ocurrs @ IntCounter, yet I do not wanna do anything :D");
  227. }
  228. }
  229. }
  230. }
Add Comment
Please, Sign In to add comment