Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package onlab;
- import java.io.IOException;
- import java.util.ArrayList;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hama.HamaConfiguration;
- import org.apache.hama.bsp.HashPartitioner;
- import org.apache.hama.bsp.TextInputFormat;
- import org.apache.hama.bsp.TextOutputFormat;
- import org.apache.hama.graph.Edge;
- import org.apache.hama.graph.GraphJob;
- import org.apache.hama.graph.Vertex;
- import org.apache.hama.graph.VertexInputReader;
- public class Boruvka {
- public static final String START_VERTEX = "boruvka.start.vertex.name";
- public static class BoruvkaVertex extends
- Vertex<Text, IntWritable, MyWritableClass> {
- private myHeap<MyWritableClass> outgoingSet = new myHeap<MyWritableClass>();
- private ArrayList<Text> prohibitionList = new ArrayList<Text>();
- private myHeap<MyWritableClass> bestEdges = new myHeap<MyWritableClass>();
- private Text startVertex;
- private int counter = 0;
- private int tempCount = 0;
- private String itemFrom = "nothing";
- public boolean isStartVertex() {
- Text startVertex = new Text(getConf().get(START_VERTEX));
- return (this.getVertexID().equals(startVertex)) ? true : false;
- }
- @Override
- public void compute(Iterable<MyWritableClass> messages) throws IOException {
- if (counter == 0) {
- for (Edge<Text, IntWritable> e : this.getEdges()) {
- outgoingSet.insert(new MyWritableClass(this.getVertexID(),e.getValue(),e.getDestinationVertexID()));
- }
- counter = 2;
- this.getPeer().write(this.getVertexID(), new Text("elso futas"));
- if (!isStartVertex()) {
- voteToHalt();
- }
- }
- if(isStartVertex()) {
- if (!(tempCount == 0)) {
- for (MyWritableClass msg : messages) {
- bestEdges.insert(msg);
- }
- }
- if (counter == 2) {
- tempCount++;
- MyWritableClass tt = new MyWritableClass(new Text(":)"),new IntWritable(-11),new Text(":("));
- if (!bestEdges.isEmpty()) {
- if (!outgoingSet.isEmpty()) {
- while (prohibitionList.contains(outgoingSet.get(0).getDestination())) {
- outgoingSet.delete();
- }
- while (prohibitionList.contains(bestEdges.get(0).getDestination())) {
- bestEdges.delete();
- }
- if (outgoingSet.get(0).compareTo(bestEdges.get(0)) < 0) {
- tt = outgoingSet.delete();
- itemFrom = "outgoingSet";
- }
- if (bestEdges.get(0).compareTo(outgoingSet.get(0)) < 0) {
- tt = bestEdges.delete();
- itemFrom = "bestEdges";
- }
- prohibitionList.add(tt.getDestination());
- if (prohibitionList.size() == getNumVertices()-1) { // EZ IGY NEM BIZTOS HOGY JO
- voteToHalt();
- } else {
- if (itemFrom == "bestEdges") {
- sendMessage(tt.getSource(), new MyWritableClass(new Text("special"),new IntWritable(0),new Text("special")));
- }
- sendMessage(tt.getDestination(), new MyWritableClass(new Text("prohib"), new IntWritable(-1), new Text("prohib")));
- this.getPeer().write(tt.getDestination(), tt.getValue());
- }
- } else {
- while (prohibitionList.contains(bestEdges.get(0).getDestination())) {
- bestEdges.delete();
- }
- tt = bestEdges.delete();
- prohibitionList.add(tt.getDestination());
- if (prohibitionList.size() == getNumVertices()-1) { // EZ IGY NEM BIZTOS HOGY JO
- voteToHalt();
- } else {
- sendMessage(tt.getSource(), new MyWritableClass(new Text("special"),new IntWritable(0),new Text("special")));
- sendMessage(tt.getDestination(), new MyWritableClass(new Text("prohib"), new IntWritable(-1), new Text("prohib")));
- this.getPeer().write(tt.getDestination(), tt.getValue());
- }
- }
- } else {
- if (!outgoingSet.isEmpty()) {
- while (prohibitionList.contains(outgoingSet.get(0).getDestination())) {
- outgoingSet.delete();
- }
- tt = outgoingSet.delete();
- prohibitionList.add(tt.getDestination());
- if (prohibitionList.size() == getNumVertices()-1) { // EZ IGY NEM BIZTOS HOGY JO
- voteToHalt();
- } else {
- sendMessage(tt.getDestination(), new MyWritableClass(new Text("prohib"), new IntWritable(-1), new Text("prohib")));
- this.getPeer().write(tt.getDestination(), tt.getValue());
- }
- } else {
- voteToHalt();
- }
- }
- counter = 1;
- } else {
- counter++;
- }
- } else if (counter != 2) {
- for (MyWritableClass msg : messages) {
- if (msg.getSource() == new Text(getConf().get(START_VERTEX)) && msg.getValue().get() == -1) {
- startVertex = new Text(getConf().get(START_VERTEX));
- prohibitionList.add(startVertex);
- while (prohibitionList.contains(outgoingSet.get(0).getDestination())) {
- outgoingSet.delete();
- }
- if (!outgoingSet.isEmpty()) {
- sendMessage(startVertex, outgoingSet.delete());
- for (Edge<Text,IntWritable> e : this.getEdges()) {
- if (e.getDestinationVertexID() != startVertex) {
- sendMessage(e.getDestinationVertexID(), new MyWritableClass(new Text("prohib"), new IntWritable(-1), new Text("prohib")));
- }
- }
- }
- } else if (msg.getSource() == new Text(getConf().get(START_VERTEX)) && msg.getValue().get() == 0) {
- while (prohibitionList.contains(outgoingSet.get(0).getDestination())) {
- outgoingSet.delete();
- }
- if (!outgoingSet.isEmpty()) {
- sendMessage(startVertex, outgoingSet.delete());
- }
- } else {
- prohibitionList.add(msg.getSource());
- }
- }
- voteToHalt();
- } else {
- counter++;
- }
- }
- }
- public static class BoruvkaTextReader extends
- VertexInputReader<LongWritable, Text, Text, IntWritable, IntWritable> {
- @Override
- public boolean parseVertex(LongWritable key, Text value,
- Vertex<Text, IntWritable, IntWritable> vertex) throws Exception {
- String[] split = value.toString().split("\t");
- for (int i = 0; i < split.length; i++) {
- if (i == 0) {
- vertex.setVertexID(new Text(split[i]));
- } else {
- String[] split2 = split[i].split(":");
- vertex.addEdge(new Edge<Text, IntWritable>(new Text(split2[0]),
- new IntWritable(Integer.parseInt(split2[1]))));
- }
- }
- return true;
- }
- }
- public static void main(String[] args) throws IOException,
- InterruptedException, ClassNotFoundException {
- // Graph job configuration
- HamaConfiguration conf = new HamaConfiguration();
- GraphJob BoruvkaJob = new GraphJob(conf, Boruvka.class);
- // Set the job name
- BoruvkaJob.setJobName("Boruvka");
- conf.set(START_VERTEX, "A");
- BoruvkaJob.setInputPath(new Path("/home/cloudera/Downloads/hama-0.6.4/proba.txt"));
- BoruvkaJob.setOutputPath(new Path("/home/cloudera/Downloads/hama-0.6.4"));
- BoruvkaJob.setVertexClass(BoruvkaVertex.class);
- //BoruvkaJob.setCombinerClass(MinIntCombiner.class);
- BoruvkaJob.setInputFormat(TextInputFormat.class);
- BoruvkaJob.setInputKeyClass(LongWritable.class);
- BoruvkaJob.setInputValueClass(Text.class);
- BoruvkaJob.setPartitioner(HashPartitioner.class);
- BoruvkaJob.setOutputFormat(TextOutputFormat.class);
- BoruvkaJob.setVertexInputReaderClass(BoruvkaTextReader.class);
- BoruvkaJob.setOutputKeyClass(Text.class);
- BoruvkaJob.setOutputValueClass(IntWritable.class);
- // Iterate until all the nodes have been reached.
- BoruvkaJob.setMaxIteration(Integer.MAX_VALUE);
- BoruvkaJob.setVertexIDClass(Text.class);
- BoruvkaJob.setVertexValueClass(IntWritable.class);
- BoruvkaJob.setEdgeValueClass(IntWritable.class);
- long startTime = System.currentTimeMillis();
- if (BoruvkaJob.waitForCompletion(true)) {
- System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment