Guest User

Untitled

a guest
May 2nd, 2014
240
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 12.76 KB | None | 0 0
  1.    
  2.  
  3.     package onlab;
  4.      
  5.     import java.io.IOException;
  6.     import java.util.ArrayList;
  7.      
  8.     import org.apache.hadoop.fs.Path;
  9.     import org.apache.hadoop.io.IntWritable;
  10.     import org.apache.hadoop.io.LongWritable;
  11.     import org.apache.hadoop.io.Text;
  12.     import org.apache.hama.HamaConfiguration;
  13.     import org.apache.hama.bsp.HashPartitioner;
  14.     import org.apache.hama.bsp.TextInputFormat;
  15.     import org.apache.hama.bsp.TextOutputFormat;
  16.     import org.apache.hama.graph.Edge;
  17.     import org.apache.hama.graph.GraphJob;
  18.     import org.apache.hama.graph.Vertex;
  19.     import org.apache.hama.graph.VertexInputReader;
  20.      
  21.     public class Boruvka {
  22.      
  23.       public static final String START_VERTEX = "boruvka.start.vertex.name";
  24.      
  25.       public static class BoruvkaVertex extends
  26.             Vertex<Text, IntWritable, MyWritableClass> {
  27.              
  28.               private myHeap<MyWritableClass> outgoingSet = new myHeap<MyWritableClass>();
  29.               private ArrayList<Text> prohibitionList = new ArrayList<Text>();
  30.               private myHeap<MyWritableClass> bestEdges = new myHeap<MyWritableClass>();
  31.               private Text startVertex;
  32.               private int counter = 0;
  33.               private int tempCount = 0;
  34.               private String itemFrom = "nothing";
  35.      
  36.         public boolean isStartVertex() {
  37.           Text startVertex = new Text(getConf().get(START_VERTEX));
  38.           return (this.getVertexID().equals(startVertex)) ? true : false;
  39.         }
  40.      
  41.             @Override
  42.             public void compute(Iterable<MyWritableClass> messages) throws IOException {
  43.                    
  44.                     if (counter == 0) {
  45.                             for (Edge<Text, IntWritable> e : this.getEdges()) {
  46.                                     outgoingSet.insert(new MyWritableClass(this.getVertexID(),e.getValue(),e.getDestinationVertexID()));
  47.                             }
  48.                            
  49.                             counter = 2;
  50.                             this.getPeer().write(this.getVertexID(), new Text("elso futas"));
  51.                             if (!isStartVertex()) {
  52.                                     voteToHalt();
  53.                             }
  54.                     }
  55.                    
  56.                     if(isStartVertex()) {
  57.                             if (!(tempCount == 0)) {
  58.                                     for (MyWritableClass msg : messages) {
  59.                                             bestEdges.insert(msg);
  60.                                     }
  61.                             }
  62.                             if (counter == 2) {
  63.                                     tempCount++;
  64.                                     MyWritableClass tt = new MyWritableClass(new Text(":)"),new IntWritable(-11),new Text(":("));
  65.                                     if (!bestEdges.isEmpty()) {
  66.                                             if (!outgoingSet.isEmpty()) {
  67.                                                     while (prohibitionList.contains(outgoingSet.get(0).getDestination())) {
  68.                                                             outgoingSet.delete();
  69.                                                     }
  70.                                                     while (prohibitionList.contains(bestEdges.get(0).getDestination())) {
  71.                                                             bestEdges.delete();
  72.                                                     }
  73.                                                     if (outgoingSet.get(0).compareTo(bestEdges.get(0)) < 0) {
  74.                                                             tt = outgoingSet.delete();
  75.                                                             itemFrom = "outgoingSet";
  76.                                                     }
  77.                                                     if (bestEdges.get(0).compareTo(outgoingSet.get(0)) < 0) {
  78.                                                             tt = bestEdges.delete();
  79.                                                             itemFrom = "bestEdges";
  80.                                                     }
  81.                                                     prohibitionList.add(tt.getDestination());
  82.                                                     if (prohibitionList.size() == getNumVertices()-1) {     // EZ IGY NEM BIZTOS HOGY JO
  83.                                                             voteToHalt();
  84.                                                     } else {
  85.                                                             if (itemFrom == "bestEdges") {
  86.                                                                     sendMessage(tt.getSource(), new MyWritableClass(new Text("special"),new IntWritable(0),new Text("special")));
  87.                                                             }
  88.                                                             sendMessage(tt.getDestination(), new MyWritableClass(new Text("prohib"), new IntWritable(-1), new Text("prohib")));
  89.                                                             this.getPeer().write(tt.getDestination(), tt.getValue());
  90.                                                     }
  91.                                             } else {
  92.                                                     while (prohibitionList.contains(bestEdges.get(0).getDestination())) {
  93.                                                             bestEdges.delete();
  94.                                                     }
  95.                                                     tt = bestEdges.delete();
  96.                                                     prohibitionList.add(tt.getDestination());
  97.                                                     if (prohibitionList.size() == getNumVertices()-1) {     // EZ IGY NEM BIZTOS HOGY JO
  98.                                                             voteToHalt();
  99.                                                     } else {
  100.                                                             sendMessage(tt.getSource(), new MyWritableClass(new Text("special"),new IntWritable(0),new Text("special")));
  101.                                                             sendMessage(tt.getDestination(), new MyWritableClass(new Text("prohib"), new IntWritable(-1), new Text("prohib")));
  102.                                                             this.getPeer().write(tt.getDestination(), tt.getValue());
  103.                                                     }
  104.                                             }
  105.                                     } else {
  106.                                             if (!outgoingSet.isEmpty()) {
  107.                                                     while (prohibitionList.contains(outgoingSet.get(0).getDestination())) {
  108.                                                             outgoingSet.delete();
  109.                                                     }
  110.                                                     tt = outgoingSet.delete();
  111.                                                     prohibitionList.add(tt.getDestination());
  112.                                                     if (prohibitionList.size() == getNumVertices()-1) {     // EZ IGY NEM BIZTOS HOGY JO
  113.                                                             voteToHalt();
  114.                                                     } else {
  115.                                                             sendMessage(tt.getDestination(), new MyWritableClass(new Text("prohib"), new IntWritable(-1), new Text("prohib")));
  116.                                                             this.getPeer().write(tt.getDestination(), tt.getValue());
  117.                                                     }
  118.                                             } else {
  119.                                                     voteToHalt();
  120.                                             }
  121.                                     }
  122.                                     counter = 1;
  123.                             } else {
  124.                                     counter++;
  125.                             }
  126.                     } else if (counter != 2) {
  127.                             for (MyWritableClass msg : messages) {
  128.                                     if (msg.getSource() == new Text(getConf().get(START_VERTEX)) && msg.getValue().get() == -1) {
  129.                                             startVertex = new Text(getConf().get(START_VERTEX));
  130.                                             prohibitionList.add(startVertex);
  131.                                             while (prohibitionList.contains(outgoingSet.get(0).getDestination())) {
  132.                                                     outgoingSet.delete();
  133.                                             }
  134.                                             if (!outgoingSet.isEmpty()) {
  135.                                                     sendMessage(startVertex, outgoingSet.delete());
  136.                                                     for (Edge<Text,IntWritable> e : this.getEdges()) {
  137.                                                             if (e.getDestinationVertexID() != startVertex) {
  138.                                                                     sendMessage(e.getDestinationVertexID(), new MyWritableClass(new Text("prohib"), new IntWritable(-1), new Text("prohib")));
  139.                                                             }
  140.                                                     }
  141.                                             }
  142.                                     } else if (msg.getSource() == new Text(getConf().get(START_VERTEX)) && msg.getValue().get() == 0) {
  143.                                             while (prohibitionList.contains(outgoingSet.get(0).getDestination())) {
  144.                                                     outgoingSet.delete();
  145.                                             }
  146.                                             if (!outgoingSet.isEmpty()) {
  147.                                                     sendMessage(startVertex, outgoingSet.delete());
  148.                                             }
  149.                                     } else {
  150.                                             prohibitionList.add(msg.getSource());
  151.                                     }
  152.                             }
  153.                             voteToHalt();
  154.                     } else {
  155.                             counter++;
  156.                     }
  157.             }
  158.       }
  159.      
  160.       public static class BoruvkaTextReader extends
  161.           VertexInputReader<LongWritable, Text, Text, IntWritable, IntWritable> {
  162.      
  163.         @Override
  164.         public boolean parseVertex(LongWritable key, Text value,
  165.             Vertex<Text, IntWritable, IntWritable> vertex) throws Exception {
  166.           String[] split = value.toString().split("\t");
  167.           for (int i = 0; i < split.length; i++) {
  168.             if (i == 0) {
  169.               vertex.setVertexID(new Text(split[i]));
  170.             } else {
  171.               String[] split2 = split[i].split(":");
  172.               vertex.addEdge(new Edge<Text, IntWritable>(new Text(split2[0]),
  173.                   new IntWritable(Integer.parseInt(split2[1]))));
  174.             }
  175.           }
  176.           return true;
  177.         }
  178.       }
  179.      
  180.       public static void main(String[] args) throws IOException,
  181.           InterruptedException, ClassNotFoundException {
  182.      
  183.         // Graph job configuration
  184.         HamaConfiguration conf = new HamaConfiguration();
  185.         GraphJob BoruvkaJob = new GraphJob(conf, Boruvka.class);
  186.         // Set the job name
  187.         BoruvkaJob.setJobName("Boruvka");
  188.      
  189.         conf.set(START_VERTEX, "A");
  190.         BoruvkaJob.setInputPath(new Path("/home/cloudera/Downloads/hama-0.6.4/proba.txt"));
  191.         BoruvkaJob.setOutputPath(new Path("/home/cloudera/Downloads/hama-0.6.4"));
  192.      
  193.         BoruvkaJob.setVertexClass(BoruvkaVertex.class);
  194.         //BoruvkaJob.setCombinerClass(MinIntCombiner.class);
  195.         BoruvkaJob.setInputFormat(TextInputFormat.class);
  196.         BoruvkaJob.setInputKeyClass(LongWritable.class);
  197.         BoruvkaJob.setInputValueClass(Text.class);
  198.      
  199.         BoruvkaJob.setPartitioner(HashPartitioner.class);
  200.         BoruvkaJob.setOutputFormat(TextOutputFormat.class);
  201.         BoruvkaJob.setVertexInputReaderClass(BoruvkaTextReader.class);
  202.         BoruvkaJob.setOutputKeyClass(Text.class);
  203.         BoruvkaJob.setOutputValueClass(IntWritable.class);
  204.         // Iterate until all the nodes have been reached.
  205.         BoruvkaJob.setMaxIteration(Integer.MAX_VALUE);
  206.      
  207.         BoruvkaJob.setVertexIDClass(Text.class);
  208.         BoruvkaJob.setVertexValueClass(IntWritable.class);
  209.         BoruvkaJob.setEdgeValueClass(IntWritable.class);
  210.      
  211.         long startTime = System.currentTimeMillis();
  212.         if (BoruvkaJob.waitForCompletion(true)) {
  213.           System.out.println("Job Finished in "
  214.               + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  215.         }
  216.       }
  217.     }
Advertisement
Add Comment
Please, Sign In to add comment