Guest User

Streaming Kmeans Spark Java

a guest
Jun 17th, 2016
283
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.84 KB | None | 0 0
  1.  
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function;
  6. import org.apache.spark.api.java.function.Function2;
  7. import org.apache.spark.api.java.function.PairFunction;
  8. import org.apache.spark.mllib.clustering.KMeans;
  9. import org.apache.spark.mllib.clustering.KMeansModel;
  10. import org.apache.spark.mllib.clustering.StreamingKMeans;
  11. import org.apache.spark.mllib.linalg.Vector;
  12. import org.apache.spark.mllib.linalg.Vectors;
  13. import org.apache.spark.mllib.regression.LabeledPoint;
  14. import org.apache.spark.streaming.StreamingContext;
  15. import org.apache.spark.streaming.api.java.JavaDStream;
  16. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  17.  
  18. import scala.Tuple2;
  19.  
  20. import org.apache.spark.streaming.Durations;
  21.  
  22.  
  23. public class SparkStreamingKmeans {
  24.    
  25.    
  26.    
  27.         public static void main(String[] args) {
  28.            
  29.  
  30.                     if (args.length != 5) {
  31.                       System.err.println(
  32.                         "Usage: StreamingKMeansExample " +
  33.                           "<trainingDir> <testDir> <batchDuration> <numClusters> <numDimensions>");
  34.                       System.exit(1);
  35.                     }
  36.                     //System.out.println(args[0]+", "+ args[1]);
  37.  
  38.                     SparkConf conf = new SparkConf().setMaster("local").setAppName("StreamingKMeansExample");
  39.                     JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(Long.parseLong(args[2])));
  40.  
  41.                      JavaDStream<Vector> trainingData = ssc.textFileStream(args[0]).map(
  42.                             new Function<String, Vector>() {
  43.                                 @Override public Vector call(String s) throws Exception {
  44.                                   System.out.println(s);
  45.                                   return  Vectors.parse(s);
  46.                                 }
  47.                               });
  48.                      
  49.                      JavaDStream<LabeledPoint> testData = ssc.textFileStream(args[1]).map(
  50.                             new Function<String, LabeledPoint>() {
  51.                                 @Override public LabeledPoint call(String s) throws Exception {
  52.                                       System.out.println(s);
  53.                                   return LabeledPoint.parse(s);
  54.                                 }
  55.                               });
  56.  
  57.                     StreamingKMeans model = new StreamingKMeans()
  58.                       .setK(Integer.parseInt(args[3]))
  59.                       .setDecayFactor(1.0)
  60.                       .setRandomCenters(Integer.parseInt(args[4]),0.0, 0);
  61.  
  62.                     model.trainOn(trainingData);
  63.                     model.predictOnValues(testData.mapToPair(
  64.                             new PairFunction<LabeledPoint,Double, Vector>() {
  65.                                 @Override
  66.                                 public Tuple2<Double, Vector> call(LabeledPoint lp) throws Exception {
  67.                                     // TODO Auto-generated method stub
  68.                                     return new Tuple2<Double, Vector>(lp.label(), lp.features());
  69.                                 }
  70.                               })).print();
  71.  
  72.                     ssc.start();
  73.                     ssc.awaitTermination();
  74.             }
  75.         }
Add Comment
Please, Sign In to add comment