Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.mllib.clustering.KMeans;
- import org.apache.spark.mllib.clustering.KMeansModel;
- import org.apache.spark.mllib.clustering.StreamingKMeans;
- import org.apache.spark.mllib.linalg.Vector;
- import org.apache.spark.mllib.linalg.Vectors;
- import org.apache.spark.mllib.regression.LabeledPoint;
- import org.apache.spark.streaming.StreamingContext;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import scala.Tuple2;
- import org.apache.spark.streaming.Durations;
- public class SparkStreamingKmeans {
- public static void main(String[] args) {
- if (args.length != 5) {
- System.err.println(
- "Usage: StreamingKMeansExample " +
- "<trainingDir> <testDir> <batchDuration> <numClusters> <numDimensions>");
- System.exit(1);
- }
- //System.out.println(args[0]+", "+ args[1]);
- SparkConf conf = new SparkConf().setMaster("local").setAppName("StreamingKMeansExample");
- JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(Long.parseLong(args[2])));
- JavaDStream<Vector> trainingData = ssc.textFileStream(args[0]).map(
- new Function<String, Vector>() {
- @Override public Vector call(String s) throws Exception {
- System.out.println(s);
- return Vectors.parse(s);
- }
- });
- JavaDStream<LabeledPoint> testData = ssc.textFileStream(args[1]).map(
- new Function<String, LabeledPoint>() {
- @Override public LabeledPoint call(String s) throws Exception {
- System.out.println(s);
- return LabeledPoint.parse(s);
- }
- });
- StreamingKMeans model = new StreamingKMeans()
- .setK(Integer.parseInt(args[3]))
- .setDecayFactor(1.0)
- .setRandomCenters(Integer.parseInt(args[4]),0.0, 0);
- model.trainOn(trainingData);
- model.predictOnValues(testData.mapToPair(
- new PairFunction<LabeledPoint,Double, Vector>() {
- @Override
- public Tuple2<Double, Vector> call(LabeledPoint lp) throws Exception {
- // TODO Auto-generated method stub
- return new Tuple2<Double, Vector>(lp.label(), lp.features());
- }
- })).print();
- ssc.start();
- ssc.awaitTermination();
- }
- }
Add Comment
Please, Sign In to add comment