Guest User

Untitled

a guest
Jun 27th, 2016
147
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.05 KB | None | 0 0
  1. package misc;
  2.  
  3. import java.util.ArrayList;
  4. import java.util.HashSet;
  5. import java.util.List;
  6. import java.util.Random;
  7. import java.util.Set;
  8.  
  9. import scala.Tuple2;
  10.  
  11. import org.apache.spark.api.java.JavaPairRDD;
  12. import org.apache.spark.api.java.JavaSparkContext;
  13. import org.apache.spark.api.java.function.PairFunction;
  14. import org.apache.spark.sql.SparkSession;
  15.  
  16. public class apache2 {
  17. private static final int numEdges = 200;
  18. private static final int numVertices = 100;
  19. private static final Random rand = new Random(42);
  20.  
  21. static List<Tuple2<Integer, Integer>> generateGraph() {
  22. Set<Tuple2<Integer, Integer>> edges = new HashSet<>(numEdges);
  23. while (edges.size() < numEdges) {
  24. int from = rand.nextInt(numVertices);
  25. int to = rand.nextInt(numVertices);
  26. Tuple2<Integer, Integer> e = new Tuple2<>(from, to);
  27. if (from != to) {
  28. edges.add(e);
  29. }
  30. }
  31. return new ArrayList<>(edges);
  32. }
  33.  
  34. static class ProjectFn implements PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
  35. Integer, Integer> {
  36. static final ProjectFn INSTANCE = new ProjectFn();
  37.  
  38. @Override
  39. public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
  40. return new Tuple2<>(triple._2()._2(), triple._2()._1());
  41. }
  42. }
  43.  
  44. public static void main(String[] args) {
  45. SparkSession spark = SparkSession
  46. .builder()
  47. .appName("JavaTC")
  48. .getOrCreate();
  49.  
  50. JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
  51.  
  52. Integer slices = (args.length > 0) ? Integer.parseInt(args[0]): 2;
  53. JavaPairRDD<Integer, Integer> tc = jsc.parallelizePairs(generateGraph(), slices).cache();
  54.  
  55. // Linear transitive closure: each round grows paths by one edge,
  56. // by joining the graph's edges with the already-discovered paths.
  57. // e.g. join the path (y, z) from the TC with the edge (x, y) from
  58. // the graph to obtain the path (x, z).
  59.  
  60. // Because join() joins on keys, the edges are stored in reversed order.
  61. JavaPairRDD<Integer, Integer> edges = tc.mapToPair(
  62. new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
  63. @Override
  64. public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
  65. return new Tuple2<>(e._2(), e._1());
  66. }
  67. });
  68.  
  69. long oldCount;
  70. long nextCount = tc.count();
  71. do {
  72. oldCount = nextCount;
  73. // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
  74. // then project the result to obtain the new (x, z) paths.
  75. tc = tc.union(tc.join(edges).mapToPair(ProjectFn.INSTANCE)).distinct().cache();
  76. nextCount = tc.count();
  77. } while (nextCount != oldCount);
  78.  
  79. System.out.println("TC has " + tc.count() + " edges.");
  80. spark.stop();
  81. }
  82. }
Add Comment
Please, Sign In to add comment