Advertisement
Guest User

Untitled

a guest
Nov 27th, 2014
153
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.73 KB | None | 0 0
  1. public abstract class BlockNestedLoopTemplate implements SkylineAlgorithm, Serializable {
  2.  
  3. private final transient TextFileToPointRDD txtToPoints;
  4. private FlagPointPairProducer flagPointPairProducer;
  5.  
  6. public BlockNestedLoopTemplate(SparkContextWrapper sparkContext) {
  7. this.txtToPoints = new TextFileToPointRDD(sparkContext);
  8. }
  9.  
  10. @Override
  11. public final List<Point2DAdvanced> getSkylinePoints(String filePath) {
  12. JavaRDD<Point2DAdvanced> points = txtToPoints.getPointRDDFromTextFile(filePath, " ");
  13. flagPointPairProducer = createFlagPointPairProducer(points);
  14.  
  15. JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> localSkylinePointsByFlag = divide(points);
  16. JavaRDD<Point2DAdvanced> skylinePoints = merge(localSkylinePointsByFlag);
  17.  
  18. return skylinePoints.collect();
  19. }
  20.  
  21. private FlagPointPairProducer createFlagPointPairProducer(JavaRDD<Point2DAdvanced> points) {
  22. Point2DAdvanced medianPoint = MedianPointFinder.findMedianPoint(points);
  23. return new FlagPointPairProducer(medianPoint);
  24. }
  25.  
  26. private JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> divide(JavaRDD<Point2DAdvanced> points) {
  27. JavaPairRDD<PointFlag, Point2DAdvanced> flagPointPairs = points.mapToPair(p -> flagPointPairProducer.getFlagPointPair(p));
  28. JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> pointsGroupedByFlag = flagPointPairs.groupByKey();
  29. JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> flagsWithLocalSkylines
  30. = pointsGroupedByFlag.mapToPair(fp -> new Tuple2(fp._1, getLocalSkylinesWithBNL(fp._2)));
  31.  
  32. return flagsWithLocalSkylines;
  33. }
  34.  
  35. private Iterable<Point2DAdvanced> getLocalSkylinesWithBNL(Iterable<Point2DAdvanced> pointIterable) {
  36. List<Point2DAdvanced> localSkylines = new ArrayList<>();
  37. for (Point2DAdvanced point : pointIterable) {
  38. localAddDiscardOrDominate(localSkylines, point);
  39. }
  40. return localSkylines;
  41. }
  42.  
  43. private void localAddDiscardOrDominate(List<Point2DAdvanced> localSkylines, Point2DAdvanced candidateSkylinePoint) {
  44. for (Iterator it = localSkylines.iterator(); it.hasNext();) {
  45. Point2DAdvanced pointToCheckAgainst = (Point2DAdvanced) it.next();
  46. if (pointToCheckAgainst.dominates(candidateSkylinePoint)) {
  47. return;
  48. } else if (candidateSkylinePoint.dominates(pointToCheckAgainst)) {
  49. it.remove();
  50. }
  51. }
  52. localSkylines.add(candidateSkylinePoint);
  53. }
  54.  
  55. protected JavaRDD<Point2DAdvanced> merge(
  56. JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> localSkylinesGroupedByFlag) {
  57.  
  58. JavaPairRDD<PointFlag, Point2DAdvanced> ungroupedLocalSkylines
  59. = localSkylinesGroupedByFlag.flatMapValues(point -> point);
  60. JavaPairRDD<PointFlag, Point2DAdvanced> sortedLocalSkylines = sortRDD(ungroupedLocalSkylines);
  61.  
  62. JavaRDD<List<Tuple2<PointFlag, Point2DAdvanced>>> groupedByTheSameId = groupByTheSameId(sortedLocalSkylines);
  63. JavaRDD<Point2DAdvanced> globalSkylinePoints
  64. = groupedByTheSameId.flatMap(singleList -> getGlobalSkylineWithBNLAndPrecomparisson(singleList));
  65.  
  66. return globalSkylinePoints;
  67. }
  68.  
  69. protected abstract JavaPairRDD<PointFlag, Point2DAdvanced> sortRDD(JavaPairRDD<PointFlag, Point2DAdvanced> flagPointPairs);
  70.  
  71. private JavaRDD<List<Tuple2<PointFlag, Point2DAdvanced>>> groupByTheSameId(JavaPairRDD<PointFlag, Point2DAdvanced> ungroupedLocalSkylines) {
  72. JavaPairRDD<PointFlag, Point2DAdvanced> mergedInOnePartition = ungroupedLocalSkylines.coalesce(1);
  73. JavaRDD<List<Tuple2<PointFlag, Point2DAdvanced>>> groupedByTheSameId = mergedInOnePartition.glom();
  74. return groupedByTheSameId;
  75. }
  76.  
  77. private List<Point2DAdvanced> getGlobalSkylineWithBNLAndPrecomparisson(List<Tuple2<PointFlag, Point2DAdvanced>> flagPointPairs) {
  78. List<Point2DAdvanced> globalSkylines = new ArrayList<>();
  79. for (Tuple2<PointFlag, Point2DAdvanced> flagPointPair : flagPointPairs) {
  80. PointFlag flag = flagPointPair._1;
  81. if (!passesPreComparisson(flag)) {
  82. continue;
  83. }
  84.  
  85. Point2DAdvanced point = flagPointPair._2;
  86. globalAddDiscardOrDominate(globalSkylines, point);
  87. }
  88. return globalSkylines;
  89. }
  90.  
  91. private boolean passesPreComparisson(PointFlag flagToCheck) {
  92. PointFlag rejectedFlag = new PointFlag(1, 1);
  93. return !flagToCheck.equals(rejectedFlag);
  94. }
  95.  
  96. protected abstract void globalAddDiscardOrDominate(
  97. List<Point2DAdvanced> globalSkylines, Point2DAdvanced candidateGlobalSkylinePoint);
  98. }
  99.  
  100. public class BlockNestedLoop extends BlockNestedLoopTemplate {
  101.  
  102. public BlockNestedLoop(SparkContextWrapper sparkContext) {
  103. super(sparkContext);
  104. }
  105.  
  106. @Override
  107. protected JavaPairRDD<PointFlag, Point2DAdvanced> sortRDD(JavaPairRDD<PointFlag, Point2DAdvanced> flagPointPairs) {
  108. return flagPointPairs;
  109. }
  110.  
  111. @Override
  112. protected void globalAddDiscardOrDominate(List<Point2DAdvanced> globalSkylines, Point2DAdvanced candidateGlobalSkylinePoint) {
  113. for (Iterator it = globalSkylines.iterator(); it.hasNext();) {
  114. Point2DAdvanced skyline = (Point2DAdvanced) it.next();
  115. if (skyline.dominates(candidateGlobalSkylinePoint)) {
  116. return;
  117. } else if (candidateGlobalSkylinePoint.dominates(skyline)) {
  118. it.remove();
  119. }
  120. }
  121. globalSkylines.add(candidateGlobalSkylinePoint);
  122. }
  123. }
  124.  
  125. public class SortFilterSkyline extends BlockNestedLoopTemplate {
  126.  
  127. public SortFilterSkyline(SparkContextWrapper sparkContext) {
  128. super(sparkContext);
  129. }
  130.  
  131. @Override
  132. protected JavaPairRDD<PointFlag, Point2DAdvanced> sortRDD(
  133. JavaPairRDD<PointFlag, Point2DAdvanced> flagPointPairs) {
  134.  
  135. JavaPairRDD<Point2DAdvanced, PointFlag> swapped = flagPointPairs.mapToPair(fp -> fp.swap());
  136. JavaPairRDD<Point2DAdvanced, PointFlag> sorted = swapped.sortByKey(new DominationComparator());
  137. JavaPairRDD<PointFlag, Point2DAdvanced> unswapped = sorted.mapToPair(fp -> fp.swap());
  138. return unswapped;
  139. }
  140.  
  141. @Override
  142. protected void globalAddDiscardOrDominate(
  143. List<Point2DAdvanced> globalSkylines, Point2DAdvanced candidateGlobalSkylinePoint) {
  144.  
  145. if (!isDominatedBySkylines(globalSkylines, candidateGlobalSkylinePoint)) {
  146. globalSkylines.add(candidateGlobalSkylinePoint);
  147. }
  148. }
  149.  
  150. private boolean isDominatedBySkylines(List<Point2DAdvanced> skylines, Point2DAdvanced candidateSkylinePoint) {
  151. for (Point2DAdvanced skyline : skylines) {
  152. if (skyline.dominates(candidateSkylinePoint)) {
  153. return true;
  154. }
  155. }
  156. return false;
  157. }
  158. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement