Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.biplob.streamReader;
- import java.io.BufferedWriter;
- import java.io.File;
- import java.io.FileWriter;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Comparator;
- import java.util.List;
- import java.util.Random;
- import org.apache.flink.api.common.functions.FoldFunction;
- import org.apache.flink.api.common.functions.ReduceFunction;
- import org.apache.flink.api.common.functions.RichFlatMapFunction;
- import org.apache.flink.api.common.functions.RichMapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.api.java.tuple.Tuple3;
- import org.apache.flink.api.java.tuple.Tuple4;
- import org.apache.flink.api.java.tuple.Tuple6;
- import org.apache.flink.api.java.utils.ParameterTool;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.core.fs.FileSystem.WriteMode;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
- import org.apache.flink.streaming.api.watermark.Watermark;
- import org.apache.flink.util.Collector;
- import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
- import org.biplob.thesis.DataTypes.Centroids;
- import org.biplob.thesis.DataTypes.Point;
- import org.biplob.thesis.DataTypes.PointLabel;
- import org.biplob.thesis.DataTypes;
- import org.biplob.thesis.CentroidsCreator;
- public class ClustreamWithLabels {
- /*
- * Run the code with the following parameters
- * --path "D:\Class_Lectures\Master Thesis\Data Sets\flinkdata_10000.csv"
- * --mc 200
- * --k 2
- * --outpath "D:\Class_Lectures\Master Thesis\Data Sets\outputFlink"
- * --p 4
- * Where mc -> Num of Centroids, k -> Number of macro clusters, p -> parallelism to use
- */
- public static void main(String[] args) throws Exception {
- ParameterTool params = ParameterTool.fromArgs(args);
- //float streamSpeed = Float.parseFloat(params.getRequired("speed"));
- float streamSpeed = 1f;
- long tw = 512; //timeWindow
- String filePath = params.getRequired("path");
- String outPath = params.getRequired("outpath");
- int num_of_mc = params.getInt("mc");
- int totalCluster = params.getInt("k");
- int getP = params.getInt("p");
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setParallelism(getP);
- int totalParallelism = env.getParallelism();
- DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath, streamSpeed));
- Centroids[] initialMC = new Centroids[num_of_mc];
- List<PointLabel> initialPointLabel = new ArrayList<PointLabel>();
- //Main Loop where the datapoints are consumed and centroids are calculated.
- ConnectedIterativeStreams<Point, Centroids[]> inputsAndCentroids = tuples.iterate()
- .withFeedbackType(Centroids[].class);
- DataStream<Tuple2<Centroids[], List<PointLabel>>> updatedCentroidsWithPoints =
- inputsAndCentroids
- .flatMap(new MyCoFlatmap(num_of_mc,tw))
- .keyBy(1)
- .countWindow(totalParallelism)
- .fold(new Tuple4<Centroids[], Integer, List<PointLabel>, Boolean>(initialMC, -1, initialPointLabel,true), new FoldMC())
- .flatMap(new ReturnMC());
- DataStream<Centroids[]> updatedCentroids = updatedCentroidsWithPoints.flatMap(new GetMC());
- inputsAndCentroids.closeWith(updatedCentroids.broadcast());
- env.execute("Stream Reader");
- }
- public static final class MyCoFlatmap extends RichCoFlatMapFunction<Point, Centroids[],
- Tuple6<Centroids[], Integer, Long, Point, Integer, Long>>{
- private static final long serialVersionUID = 1L;
- Centroids[] Centroids;
- long timestamp = 0;
- long id = 0;
- int k =0;
- int numofMC = 5;
- boolean flag = true;
- int subTaskId = 0;
- public MyCoFlatmap(int numofMC, long timeWindow)
- {
- this.numofMC = numofMC;
- this.timeWindow = timeWindow;
- }
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- subTaskId = getRuntimeContext().getIndexOfThisSubtask();
- }
- @Override
- public void flatMap1(Point in, Collector<Tuple6<Centroids[], Integer, Long, Point, Integer, Long>> out) throws Exception {
- // Printing Here, how many points i am recieving over different parititons.
- System.out.println(subTaskId + ": "+count++);
- if(flag)
- {
- //Initialise the centroids and allocate memory for them
- Centroids = new Centroids[numofMC];
- flag = false;
- }
- //If ccount of incoming datapoints is less than num of centroids, then don't collect just populate the centroids
- if(id < numofMC)
- {
- Centroids generatedMC = CentroidsCreator.generateCentroids(id,in.timestamp,in, numofMC);
- Centroids[(int) id] = generatedMC;
- id++;
- return;
- }
- //if count of data point is more then assign the data point to one of the centroids and collect.
- else
- {
- Centroids closestMC = null;
- double minDistance = Double.MAX_VALUE;
- for(Centroids mc : Centroids)
- {
- try
- {
- if(mc != null)
- {
- double distance = distance(in.pt, mc.getCenter());
- if (distance < minDistance) {
- closestMC = mc;
- minDistance = distance;
- }
- }
- }catch(NullPointerException e)
- {
- System.out.println("MyCoFlatMap: " + Arrays.toString(in.pt)+", "+
- Arrays.toString(Centroids)+"," +e);
- System.exit(1);
- }
- }
- double radius = getRadius(closestMC, Centroids);
- if (minDistance < radius)
- {
- closestMC.insert(in.pt, timestamp);
- //<Centroids>, <Index>, <timestamp>, <point>, <Partition_Id>, <ID of point in the partition (Not necessary)>
- out.collect(new Tuple6<Centroids[], Integer, Long, Point, Integer, Long>
- (Centroids, -1, System.nanoTime(), in, subTaskId, Long.valueOf(closestMC.id)));
- return;
- }
- }
- }
- @Override
- public void flatMap2(Centroids[] in, Collector<Tuple6<Centroids[], Integer, Long, Point, Integer, Long>> out) throws Exception {
- //When you get list of updated centroids update the centroids array
- for(int i = 0; i < Centroids.length;i++)
- {
- Centroids[i] = in [i].div(in[i].n);
- }
- }
- }
- //-------------------------------------------------------------------------------------------//
- private static double distance(double[] pointA, double[] pointB)
- {
- double distance = 0.0;
- try
- {
- for (int i = 0; i < pointA.length; i++) {
- double d = pointA[i] - pointB[i];
- distance += d * d;
- }
- }catch(Exception e)
- {
- System.out.println(" In distance method: " + pointA.length + "," + pointB.length);
- System.exit(0);
- }
- return Math.sqrt(distance);
- }
- //-------------------------------------------------------------------------------------------//
- private static double getRadius(Centroids closestMC, Centroids[] Centroids )
- {
- // 2. Check whether instance fits into closestKernel
- double radius;
- if (closestMC != null && closestMC.n == 1) {
- // Special case: estimate radius by determining the distance to the
- // next closest cluster
- radius = Double.MAX_VALUE;
- double[] center = closestMC.getCenter();
- for (Centroids kernel : Centroids) {
- if (kernel == closestMC) {
- continue;
- }
- try
- {
- if(kernel != null)
- {
- double distance = distance(kernel.getCenter(), center);
- radius = Math.min(distance, radius);
- }
- }catch(NullPointerException e)
- {
- System.out.println("Return MC: "+kernel+center+e);
- System.exit(1);
- }
- }
- } else {
- radius = closestMC.getRadius();
- }
- return radius;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement