Advertisement
Guest User

Untitled

a guest
Dec 18th, 2018
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.84 KB | None | 0 0
  1.  
  2. import java.io.IOException;
  3.  
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.hadoop.util.StringUtils;
  15.  
  16.  
  17. public class RunJob {
  18.  
  19. static enum eInf {
  20. COUNTER
  21. }
  22.  
  23. public static void main(String[] args) {
  24. Configuration conf = new Configuration();
  25.  
  26. conf.set("fs.defaultFS", "hdfs://node1:8020");
  27.  
  28. try {
  29. FileSystem fs = FileSystem.get(conf);
  30. int i = 0;
  31. long num = 1;
  32. long tmp = 0;
  33. while (num > 0) {
  34. i++;
  35. conf.setInt("run.counter", i);
  36. Job job = Job.getInstance(conf);
  37. job.setJarByClass(RunJob.class);
  38. job.setMapperClass(ShortestPathMapper.class);
  39. job.setReducerClass(ShortestPathReducer.class);
  40. job.setMapOutputKeyClass(Text.class);
  41. job.setMapOutputValueClass(Text.class);
  42.  
  43. //key value 的格式 第一个item为key,后面的item为value
  44. job.setInputFormatClass(KeyValueTextInputFormat.class);
  45.  
  46. if (i == 1)
  47. FileInputFormat.addInputPath(job, new Path("/test/shortestpath/input/"));
  48. else
  49. FileInputFormat.addInputPath(job, new Path("/test/shortestpath/output/sp" + (i - 1)));
  50.  
  51. Path outPath = new Path("/test/shortestpath/output/sp" + i);
  52. if (fs.exists(outPath)) {
  53. fs.delete(outPath, true);
  54. }
  55. FileOutputFormat.setOutputPath(job, outPath);
  56.  
  57. boolean b = job.waitForCompletion(true);
  58.  
  59. if (b) {
  60. num = job.getCounters().findCounter(eInf.COUNTER).getValue();
  61. if (num == 0) {
  62. System.out.println("执行了" + i + "次,完成最短路径的计算");
  63. }
  64. }
  65. }
  66.  
  67. } catch (Exception e) {
  68.  
  69. e.printStackTrace();
  70. }
  71.  
  72. }
  73.  
  74.  
  75. public static class ShortestPathMapper extends Mapper<Text, Text, Text, Text> {
  76.  
  77. protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
  78. int conuter = context.getConfiguration().getInt("run.counter", 1);
  79.  
  80. Node node = new Node();
  81. String distance = null;
  82. String str = null;
  83.  
  84. // 第一次计算,填写默认距离 A:0 其他:inf
  85. if (conuter == 1) {
  86. if (key.toString().equals("A") || key.toString().equals("1")) {
  87. distance = "0";
  88. } else {
  89. distance = "inf";
  90. }
  91. str = distance + "\t" + value.toString();
  92. } else {
  93. str = value.toString();
  94. }
  95.  
  96. context.write(key, new Text(str));
  97.  
  98. node.FormatNode(str);
  99.  
  100. // 没走到此节点 退出
  101. if (node.getDistance().equals("inf"))
  102. return;
  103.  
  104. // 重新计算源点A到各点的距离
  105. for (int i = 0; i < node.getNodeNum(); i++) {
  106. String k = node.getNodeKey(i);
  107. String v = new String(
  108. Integer.parseInt(node.getNodeValue(i)) + Integer.parseInt(node.getDistance()) + "");
  109. context.write(new Text(k), new Text(v));
  110. }
  111. }
  112. }
  113.  
  114. public static class ShortestPathReducer extends Reducer<Text, Text, Text, Text> {
  115.  
  116. protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2) throws IOException, InterruptedException {
  117. String min = null;
  118. int i = 0;
  119. String dis = "inf";
  120. Node node = new Node();
  121. for (Text t : arg1) {
  122. i++;
  123. dis = StringUtils.split(t.toString(), '\t')[0];
  124.  
  125. // 如果存在inf节点,表示存在没有计算距离的节点。
  126. // if(dis.equals("inf"))
  127. // arg2.getCounter(eInf.COUNTER).increment(1L);
  128.  
  129. // 判断是否存在相邻节点,如果是则需要保留信息,并找到最小距离进行更新。
  130. String[] strs = StringUtils.split(t.toString(), '\t');
  131. if (strs.length > 1) {
  132. node.FormatNode(t.toString());
  133. }
  134.  
  135. // 第一条数据默认是最小距离
  136. if (i == 1) {
  137. min = dis;
  138. } else {
  139. if (dis.equals("inf"))
  140. ;
  141. else if (min.equals("inf"))
  142. min = dis;
  143. else if (Integer.parseInt(min) > Integer.parseInt(dis)) {
  144. min = dis;
  145. }
  146. }
  147. }
  148.  
  149. // 有新的最小值,说明还在进行优化计算,需要继续循环计算
  150. if (!min.equals("inf")) {
  151. if (node.getDistance().equals("inf"))
  152. arg2.getCounter(eInf.COUNTER).increment(1L);
  153. else {
  154. if (Integer.parseInt(node.getDistance()) > Integer.parseInt(min))
  155. arg2.getCounter(eInf.COUNTER).increment(1L);
  156. }
  157. }
  158.  
  159. node.setDistance(min);
  160.  
  161. arg2.write(arg0, new Text(node.toString()));
  162. }
  163. }
  164. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement