Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public abstract class BlockNestedLoopTemplate implements SkylineAlgorithm, Serializable {
- private final transient TextFileToPointRDD txtToPoints;
- private FlagPointPairProducer flagPointPairProducer;
- public BlockNestedLoopTemplate(SparkContextWrapper sparkContext) {
- this.txtToPoints = new TextFileToPointRDD(sparkContext);
- }
- @Override
- public final List<Point2DAdvanced> getSkylinePoints(String filePath) {
- JavaRDD<Point2DAdvanced> points = txtToPoints.getPointRDDFromTextFile(filePath, " ");
- flagPointPairProducer = createFlagPointPairProducer(points);
- JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> localSkylinePointsByFlag = divide(points);
- JavaRDD<Point2DAdvanced> skylinePoints = merge(localSkylinePointsByFlag);
- return skylinePoints.collect();
- }
- private FlagPointPairProducer createFlagPointPairProducer(JavaRDD<Point2DAdvanced> points) {
- Point2DAdvanced medianPoint = MedianPointFinder.findMedianPoint(points);
- return new FlagPointPairProducer(medianPoint);
- }
- private JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> divide(JavaRDD<Point2DAdvanced> points) {
- JavaPairRDD<PointFlag, Point2DAdvanced> flagPointPairs = points.mapToPair(p -> flagPointPairProducer.getFlagPointPair(p));
- JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> pointsGroupedByFlag = flagPointPairs.groupByKey();
- JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> flagsWithLocalSkylines
- = pointsGroupedByFlag.mapToPair(fp -> new Tuple2(fp._1, getLocalSkylinesWithBNL(fp._2)));
- return flagsWithLocalSkylines;
- }
- private Iterable<Point2DAdvanced> getLocalSkylinesWithBNL(Iterable<Point2DAdvanced> pointIterable) {
- List<Point2DAdvanced> localSkylines = new ArrayList<>();
- for (Point2DAdvanced point : pointIterable) {
- localAddDiscardOrDominate(localSkylines, point);
- }
- return localSkylines;
- }
- private void localAddDiscardOrDominate(List<Point2DAdvanced> localSkylines, Point2DAdvanced candidateSkylinePoint) {
- for (Iterator it = localSkylines.iterator(); it.hasNext();) {
- Point2DAdvanced pointToCheckAgainst = (Point2DAdvanced) it.next();
- if (pointToCheckAgainst.dominates(candidateSkylinePoint)) {
- return;
- } else if (candidateSkylinePoint.dominates(pointToCheckAgainst)) {
- it.remove();
- }
- }
- localSkylines.add(candidateSkylinePoint);
- }
- protected JavaRDD<Point2DAdvanced> merge(
- JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> localSkylinesGroupedByFlag) {
- JavaPairRDD<PointFlag, Point2DAdvanced> ungroupedLocalSkylines
- = localSkylinesGroupedByFlag.flatMapValues(point -> point);
- JavaPairRDD<PointFlag, Point2DAdvanced> sortedLocalSkylines = sortRDD(ungroupedLocalSkylines);
- JavaRDD<List<Tuple2<PointFlag, Point2DAdvanced>>> groupedByTheSameId = groupByTheSameId(sortedLocalSkylines);
- JavaRDD<Point2DAdvanced> globalSkylinePoints
- = groupedByTheSameId.flatMap(singleList -> getGlobalSkylineWithBNLAndPrecomparisson(singleList));
- return globalSkylinePoints;
- }
- protected abstract JavaPairRDD<PointFlag, Point2DAdvanced> sortRDD(JavaPairRDD<PointFlag, Point2DAdvanced> flagPointPairs);
- private JavaRDD<List<Tuple2<PointFlag, Point2DAdvanced>>> groupByTheSameId(JavaPairRDD<PointFlag, Point2DAdvanced> ungroupedLocalSkylines) {
- JavaPairRDD<PointFlag, Point2DAdvanced> mergedInOnePartition = ungroupedLocalSkylines.coalesce(1);
- JavaRDD<List<Tuple2<PointFlag, Point2DAdvanced>>> groupedByTheSameId = mergedInOnePartition.glom();
- return groupedByTheSameId;
- }
- private List<Point2DAdvanced> getGlobalSkylineWithBNLAndPrecomparisson(List<Tuple2<PointFlag, Point2DAdvanced>> flagPointPairs) {
- List<Point2DAdvanced> globalSkylines = new ArrayList<>();
- for (Tuple2<PointFlag, Point2DAdvanced> flagPointPair : flagPointPairs) {
- PointFlag flag = flagPointPair._1;
- if (!passesPreComparisson(flag)) {
- continue;
- }
- Point2DAdvanced point = flagPointPair._2;
- globalAddDiscardOrDominate(globalSkylines, point);
- }
- return globalSkylines;
- }
- private boolean passesPreComparisson(PointFlag flagToCheck) {
- PointFlag rejectedFlag = new PointFlag(1, 1);
- return !flagToCheck.equals(rejectedFlag);
- }
- protected abstract void globalAddDiscardOrDominate(
- List<Point2DAdvanced> globalSkylines, Point2DAdvanced candidateGlobalSkylinePoint);
- }
- public class BlockNestedLoop extends BlockNestedLoopTemplate {
- public BlockNestedLoop(SparkContextWrapper sparkContext) {
- super(sparkContext);
- }
- @Override
- protected JavaPairRDD<PointFlag, Point2DAdvanced> sortRDD(JavaPairRDD<PointFlag, Point2DAdvanced> flagPointPairs) {
- return flagPointPairs;
- }
- @Override
- protected void globalAddDiscardOrDominate(List<Point2DAdvanced> globalSkylines, Point2DAdvanced candidateGlobalSkylinePoint) {
- for (Iterator it = globalSkylines.iterator(); it.hasNext();) {
- Point2DAdvanced skyline = (Point2DAdvanced) it.next();
- if (skyline.dominates(candidateGlobalSkylinePoint)) {
- return;
- } else if (candidateGlobalSkylinePoint.dominates(skyline)) {
- it.remove();
- }
- }
- globalSkylines.add(candidateGlobalSkylinePoint);
- }
- }
- public class SortFilterSkyline extends BlockNestedLoopTemplate {
- public SortFilterSkyline(SparkContextWrapper sparkContext) {
- super(sparkContext);
- }
- @Override
- protected JavaPairRDD<PointFlag, Point2DAdvanced> sortRDD(
- JavaPairRDD<PointFlag, Point2DAdvanced> flagPointPairs) {
- JavaPairRDD<Point2DAdvanced, PointFlag> swapped = flagPointPairs.mapToPair(fp -> fp.swap());
- JavaPairRDD<Point2DAdvanced, PointFlag> sorted = swapped.sortByKey(new DominationComparator());
- JavaPairRDD<PointFlag, Point2DAdvanced> unswapped = sorted.mapToPair(fp -> fp.swap());
- return unswapped;
- }
- @Override
- protected void globalAddDiscardOrDominate(
- List<Point2DAdvanced> globalSkylines, Point2DAdvanced candidateGlobalSkylinePoint) {
- if (!isDominatedBySkylines(globalSkylines, candidateGlobalSkylinePoint)) {
- globalSkylines.add(candidateGlobalSkylinePoint);
- }
- }
- private boolean isDominatedBySkylines(List<Point2DAdvanced> skylines, Point2DAdvanced candidateSkylinePoint) {
- for (Point2DAdvanced skyline : skylines) {
- if (skyline.dominates(candidateSkylinePoint)) {
- return true;
- }
- }
- return false;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement