Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.StringUtils;
- public class RunJob {
- static enum eInf {
- COUNTER
- }
- public static void main(String[] args) {
- Configuration conf = new Configuration();
- conf.set("fs.defaultFS", "hdfs://node1:8020");
- try {
- FileSystem fs = FileSystem.get(conf);
- int i = 0;
- long num = 1;
- long tmp = 0;
- while (num > 0) {
- i++;
- conf.setInt("run.counter", i);
- Job job = Job.getInstance(conf);
- job.setJarByClass(RunJob.class);
- job.setMapperClass(ShortestPathMapper.class);
- job.setReducerClass(ShortestPathReducer.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- //key value 的格式 第一个item为key,后面的item为value
- job.setInputFormatClass(KeyValueTextInputFormat.class);
- if (i == 1)
- FileInputFormat.addInputPath(job, new Path("/test/shortestpath/input/"));
- else
- FileInputFormat.addInputPath(job, new Path("/test/shortestpath/output/sp" + (i - 1)));
- Path outPath = new Path("/test/shortestpath/output/sp" + i);
- if (fs.exists(outPath)) {
- fs.delete(outPath, true);
- }
- FileOutputFormat.setOutputPath(job, outPath);
- boolean b = job.waitForCompletion(true);
- if (b) {
- num = job.getCounters().findCounter(eInf.COUNTER).getValue();
- if (num == 0) {
- System.out.println("执行了" + i + "次,完成最短路径的计算");
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static class ShortestPathMapper extends Mapper<Text, Text, Text, Text> {
- protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- int conuter = context.getConfiguration().getInt("run.counter", 1);
- Node node = new Node();
- String distance = null;
- String str = null;
- // 第一次计算,填写默认距离 A:0 其他:inf
- if (conuter == 1) {
- if (key.toString().equals("A") || key.toString().equals("1")) {
- distance = "0";
- } else {
- distance = "inf";
- }
- str = distance + "\t" + value.toString();
- } else {
- str = value.toString();
- }
- context.write(key, new Text(str));
- node.FormatNode(str);
- // 没走到此节点 退出
- if (node.getDistance().equals("inf"))
- return;
- // 重新计算源点A到各点的距离
- for (int i = 0; i < node.getNodeNum(); i++) {
- String k = node.getNodeKey(i);
- String v = new String(
- Integer.parseInt(node.getNodeValue(i)) + Integer.parseInt(node.getDistance()) + "");
- context.write(new Text(k), new Text(v));
- }
- }
- }
- public static class ShortestPathReducer extends Reducer<Text, Text, Text, Text> {
- protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2) throws IOException, InterruptedException {
- String min = null;
- int i = 0;
- String dis = "inf";
- Node node = new Node();
- for (Text t : arg1) {
- i++;
- dis = StringUtils.split(t.toString(), '\t')[0];
- // 如果存在inf节点,表示存在没有计算距离的节点。
- // if(dis.equals("inf"))
- // arg2.getCounter(eInf.COUNTER).increment(1L);
- // 判断是否存在相邻节点,如果是则需要保留信息,并找到最小距离进行更新。
- String[] strs = StringUtils.split(t.toString(), '\t');
- if (strs.length > 1) {
- node.FormatNode(t.toString());
- }
- // 第一条数据默认是最小距离
- if (i == 1) {
- min = dis;
- } else {
- if (dis.equals("inf"))
- ;
- else if (min.equals("inf"))
- min = dis;
- else if (Integer.parseInt(min) > Integer.parseInt(dis)) {
- min = dis;
- }
- }
- }
- // 有新的最小值,说明还在进行优化计算,需要继续循环计算
- if (!min.equals("inf")) {
- if (node.getDistance().equals("inf"))
- arg2.getCounter(eInf.COUNTER).increment(1L);
- else {
- if (Integer.parseInt(node.getDistance()) > Integer.parseInt(min))
- arg2.getCounter(eInf.COUNTER).increment(1L);
- }
- }
- node.setDistance(min);
- arg2.write(arg0, new Text(node.toString()));
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement