Advertisement
Guest User

Datapoints go missing in Coflatmap

a guest
Jul 13th, 2016
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 8.04 KB | None | 0 0
  1. package org.biplob.streamReader;
  2.  
  3. import java.io.BufferedWriter;
  4. import java.io.File;
  5. import java.io.FileWriter;
  6. import java.util.ArrayList;
  7. import java.util.Arrays;
  8. import java.util.Comparator;
  9. import java.util.List;
  10. import java.util.Random;
  11.  
  12. import org.apache.flink.api.common.functions.FoldFunction;
  13. import org.apache.flink.api.common.functions.ReduceFunction;
  14. import org.apache.flink.api.common.functions.RichFlatMapFunction;
  15. import org.apache.flink.api.common.functions.RichMapFunction;
  16. import org.apache.flink.api.java.tuple.Tuple2;
  17. import org.apache.flink.api.java.tuple.Tuple3;
  18. import org.apache.flink.api.java.tuple.Tuple4;
  19. import org.apache.flink.api.java.tuple.Tuple6;
  20. import org.apache.flink.api.java.utils.ParameterTool;
  21. import org.apache.flink.configuration.Configuration;
  22. import org.apache.flink.core.fs.FileSystem.WriteMode;
  23. import org.apache.flink.streaming.api.TimeCharacteristic;
  24. import org.apache.flink.streaming.api.datastream.DataStream;
  25. import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams;
  26. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  27. import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
  28. import org.apache.flink.streaming.api.watermark.Watermark;
  29. import org.apache.flink.util.Collector;
  30. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
  31. import org.biplob.thesis.DataTypes.Centroids;
  32. import org.biplob.thesis.DataTypes.Point;
  33. import org.biplob.thesis.DataTypes.PointLabel;
  34. import org.biplob.thesis.DataTypes;
  35. import org.biplob.thesis.CentroidsCreator;
  36.  
  37.  
  38. public class ClustreamWithLabels {
  39.    
  40.     /*
  41.      * Run the code with the following parameters
  42.      * --path "D:\Class_Lectures\Master Thesis\Data Sets\flinkdata_10000.csv"
  43.      * --mc 200
  44.      * --k 2
  45.      * --outpath "D:\Class_Lectures\Master Thesis\Data Sets\outputFlink"
  46.      * --p 4
  47.      * Where mc -> Num of Centroids, k -> Number of macro clusters, p -> parallelism to use
  48.      */
  49.    
  50.     public static void main(String[] args) throws Exception {
  51.        
  52.         ParameterTool params = ParameterTool.fromArgs(args);
  53.         //float streamSpeed = Float.parseFloat(params.getRequired("speed"));
  54.         float streamSpeed = 1f;
  55.        
  56.        
  57.         long tw = 512; //timeWindow
  58.         String filePath = params.getRequired("path");
  59.         String outPath = params.getRequired("outpath");
  60.         int num_of_mc = params.getInt("mc");
  61.         int totalCluster = params.getInt("k");
  62.         int getP = params.getInt("p");
  63.        
  64.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  65.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  66.        
  67.         env.setParallelism(getP);
  68.         int totalParallelism = env.getParallelism();
  69.  
  70.         DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath, streamSpeed));
  71.        
  72.         Centroids[] initialMC = new Centroids[num_of_mc];
  73.         List<PointLabel> initialPointLabel = new ArrayList<PointLabel>();
  74.  
  75.        
  76.         //Main Loop where the datapoints are consumed and centroids are calculated.
  77.         ConnectedIterativeStreams<Point, Centroids[]> inputsAndCentroids = tuples.iterate()
  78.                                                         .withFeedbackType(Centroids[].class);
  79.  
  80.         DataStream<Tuple2<Centroids[], List<PointLabel>>> updatedCentroidsWithPoints =
  81.                 inputsAndCentroids
  82.                 .flatMap(new MyCoFlatmap(num_of_mc,tw))
  83.                 .keyBy(1)
  84.                 .countWindow(totalParallelism)
  85.                 .fold(new Tuple4<Centroids[], Integer, List<PointLabel>, Boolean>(initialMC, -1, initialPointLabel,true), new FoldMC())
  86.                 .flatMap(new ReturnMC());
  87.        
  88.         DataStream<Centroids[]> updatedCentroids = updatedCentroidsWithPoints.flatMap(new GetMC());
  89.         inputsAndCentroids.closeWith(updatedCentroids.broadcast());
  90.  
  91.         env.execute("Stream Reader");
  92.        
  93.     }  
  94.  
  95.    
  96.     public static final class MyCoFlatmap extends RichCoFlatMapFunction<Point, Centroids[],
  97.                                                     Tuple6<Centroids[], Integer, Long, Point, Integer, Long>>{
  98.  
  99.         private static final long serialVersionUID = 1L;
  100.         Centroids[] Centroids;
  101.         long timestamp = 0;
  102.         long id = 0;
  103.         int k =0;
  104.         int numofMC = 5;
  105.         boolean flag = true;
  106.         int subTaskId = 0;
  107.        
  108.         public MyCoFlatmap(int numofMC, long timeWindow)
  109.         {
  110.             this.numofMC = numofMC;
  111.             this.timeWindow = timeWindow;
  112.         }
  113.        
  114.         public void open(Configuration parameters) throws Exception {
  115.               super.open(parameters);
  116.               subTaskId = getRuntimeContext().getIndexOfThisSubtask();
  117.             }
  118.        
  119.         @Override
  120.         public void flatMap1(Point in, Collector<Tuple6<Centroids[], Integer, Long, Point, Integer, Long>> out) throws Exception {
  121.            
  122.             // Printing Here, how many points i am recieving over different parititons.
  123.             System.out.println(subTaskId + ": "+count++);
  124.  
  125.             if(flag)
  126.             {
  127.                 //Initialise the centroids and allocate memory for them
  128.                 Centroids = new Centroids[numofMC];
  129.                 flag = false;
  130.             }
  131.            
  132.             //If ccount of incoming datapoints is less than num of centroids, then don't collect just populate the centroids
  133.             if(id < numofMC)
  134.             {  
  135.                 Centroids generatedMC = CentroidsCreator.generateCentroids(id,in.timestamp,in, numofMC);
  136.                 Centroids[(int) id] = generatedMC;
  137.                 id++;
  138.                 return;
  139.             }
  140.             //if count of data point is more then assign the data point to one of the centroids and collect.
  141.             else
  142.             {
  143.  
  144.                 Centroids closestMC = null;
  145.                 double minDistance = Double.MAX_VALUE;
  146.                 for(Centroids mc : Centroids)
  147.                 {
  148.                     try
  149.                     {
  150.                         if(mc != null)
  151.                         {
  152.                             double distance = distance(in.pt, mc.getCenter());
  153.                             if (distance < minDistance) {
  154.                                 closestMC = mc;
  155.                                 minDistance = distance;
  156.                             }
  157.                         }
  158.                     }catch(NullPointerException e)
  159.                     {
  160.                         System.out.println("MyCoFlatMap: " + Arrays.toString(in.pt)+", "+
  161.                                             Arrays.toString(Centroids)+"," +e);
  162.                         System.exit(1);
  163.                     }
  164.                 }
  165.                
  166.                 double radius = getRadius(closestMC, Centroids);
  167.                 if (minDistance < radius)
  168.                 {
  169.                     closestMC.insert(in.pt, timestamp);
  170.                     //<Centroids>, <Index>, <timestamp>, <point>, <Partition_Id>, <ID of point in the partition (Not necessary)>
  171.                     out.collect(new Tuple6<Centroids[], Integer, Long, Point, Integer, Long>
  172.                                     (Centroids, -1, System.nanoTime(), in, subTaskId, Long.valueOf(closestMC.id)));
  173.                     return;
  174.                 }
  175.  
  176.  
  177.             }
  178.            
  179.         }
  180.  
  181.         @Override
  182.         public void flatMap2(Centroids[] in, Collector<Tuple6<Centroids[], Integer, Long, Point, Integer, Long>> out) throws Exception {
  183.            
  184.             //When you get list of updated centroids update the centroids array
  185.             for(int i = 0; i < Centroids.length;i++)
  186.             {
  187.                 Centroids[i] = in [i].div(in[i].n);
  188.             }
  189.         }  
  190.     }
  191.    
  192.    
  193.     //-------------------------------------------------------------------------------------------//
  194.    
  195.     private static double distance(double[] pointA, double[] pointB)
  196.     {
  197.         double distance = 0.0;
  198.         try
  199.         {
  200.             for (int i = 0; i < pointA.length; i++) {
  201.               double d = pointA[i] - pointB[i];
  202.               distance += d * d;
  203.             }
  204.         }catch(Exception e)
  205.         {
  206.             System.out.println(" In distance method: " + pointA.length + "," + pointB.length);
  207.             System.exit(0);
  208.         }
  209.         return Math.sqrt(distance);
  210.     }
  211.    
  212.     //-------------------------------------------------------------------------------------------//
  213.  
  214.     private static double getRadius(Centroids closestMC, Centroids[] Centroids )
  215.     {
  216.         // 2. Check whether instance fits into closestKernel
  217.         double radius;
  218.         if (closestMC != null && closestMC.n == 1) {
  219.           // Special case: estimate radius by determining the distance to the
  220.           // next closest cluster
  221.           radius = Double.MAX_VALUE;
  222.           double[] center = closestMC.getCenter();
  223.           for (Centroids kernel : Centroids) {
  224.             if (kernel == closestMC) {
  225.               continue;
  226.             }
  227.             try
  228.             {
  229.                 if(kernel != null)
  230.                 {
  231.                     double distance = distance(kernel.getCenter(), center);
  232.                     radius = Math.min(distance, radius);
  233.                 }
  234.                
  235.             }catch(NullPointerException e)
  236.             {
  237.                 System.out.println("Return MC: "+kernel+center+e);
  238.                 System.exit(1);
  239.             }
  240.           }
  241.         } else {
  242.           radius = closestMC.getRadius();
  243.         }
  244.         return radius;
  245.     }
  246.    
  247. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement