Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main;
- import java.util.ArrayList;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.regex.Pattern;
- import org.apache.commons.math3.util.Pair;
- import org.apache.spark.Partitioner;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFlatMapFunction;
- import org.apache.spark.api.java.function.PairFunction;
- import scala.Tuple2;
- /**
- * Created by ale on 07/04/14.
- */
- public class SCD {
- private static final Pattern SPACE = Pattern.compile(" ");
- public static void main(String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println("Usage: WordCount <master> <file>");
- System.exit(1);
- }
- JavaSparkContext ctx = new JavaSparkContext(args[0], "SCD",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(SCD.class));
- JavaRDD<String> timeSeries = ctx.textFile(args[1], 1);
- // Assumption, file in this format:
- // columnIndex rowIndex value
- // 0 0 1.3
- // 0 1 1.1
- // 1 0 2.2
- // 1 1 3.4
- JavaPairRDD<Tuple2<Integer, Integer>, Double> timeSeriesObservationMatrix = timeSeries.mapToPair(new PairFunction<String, Tuple2<Integer, Integer>, Double>() {
- @Override
- public Tuple2<Tuple2<Integer, Integer>, Double> call(String s) {
- return new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(Integer.parseInt(SPACE.split(s)[0]), Integer.parseInt(SPACE.split(s)[1])), Double.parseDouble(SPACE.split(s)[2]));
- }
- });
- JavaRDD<Integer> columnIndices = timeSeriesObservationMatrix.map(new Function<Tuple2<Tuple2<Integer, Integer>, Double>, Integer>() {
- @Override
- public Integer call(Tuple2<Tuple2<Integer, Integer>, Double> tuple2DoubleTuple2) throws Exception {
- return tuple2DoubleTuple2._1._1;
- }
- });
- JavaRDD<Integer> rowIndices = timeSeriesObservationMatrix.map(new Function<Tuple2<Tuple2<Integer, Integer>, Double>, Integer>() {
- @Override
- public Integer call(Tuple2<Tuple2<Integer, Integer>, Double> tuple2DoubleTuple2) throws Exception {
- return tuple2DoubleTuple2._1._2;
- }
- });
- final int maxColumnIndex = columnIndices.reduce(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer integer, Integer integer2) throws Exception {
- return Math.max(integer, integer2);
- }
- });
- final int maxRowIndex = rowIndices.reduce(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer integer, Integer integer2) throws Exception {
- return Math.max(integer, integer2);
- }
- });
- // System.out.println(maxColumnIndex + "x" + maxRowIndex + " matrix size:" + timeSeriesObservationMatrix.count());
- JavaPairRDD<Integer, List<Double>> partitionedByColumnTimeSeries = timeSeriesObservationMatrix.partitionBy(new Partitioner() {
- @Override
- public int numPartitions() {
- return maxColumnIndex + 1;
- }
- @Override
- public int getPartition(Object o) {
- // System.out.println("My object:" + o);
- return ((Tuple2<Integer, Integer>) o)._1;
- }
- }).mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<Tuple2<Integer, Integer>, Double>>, Integer, List<Double>>() {
- @Override
- public Iterable<Tuple2<Integer, List<Double>>> call(Iterator<Tuple2<Tuple2<Integer, Integer>, Double>> tuple2Iterator) throws Exception {
- List<Tuple2<Integer, List<Double>>> list = new ArrayList<Tuple2<Integer, List<Double>>>();
- List<Double> singleList = new ArrayList<Double>();
- Integer index = 0;
- while (tuple2Iterator.hasNext()) {
- Tuple2<Tuple2<Integer, Integer>, Double> partitionTuple = tuple2Iterator.next();
- singleList.add(partitionTuple._2);
- index = partitionTuple._1._1;
- }
- list.add(new Tuple2<Integer, List<Double>>(index, singleList));
- return list;
- }
- });
- //System.out.println("Number of column partitions: " + partitionedByColumnTimeSeries.count());
- JavaPairRDD<Integer, List<Double>> partitionedByRowTimeSeries = timeSeriesObservationMatrix.partitionBy(new Partitioner() {
- @Override
- public int numPartitions() {
- return maxRowIndex + 1;
- }
- @Override
- public int getPartition(Object o) {
- return ((Tuple2<Integer, Integer>) o)._2;
- }
- }).mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<Tuple2<Integer, Integer>, Double>>, Integer, List<Double>>() {
- @Override
- public Iterable<Tuple2<Integer, List<Double>>> call(Iterator<Tuple2<Tuple2<Integer, Integer>, Double>> tuple2Iterator) throws Exception {
- List<Tuple2<Integer, List<Double>>> list = new ArrayList<Tuple2<Integer, List<Double>>>();
- List<Double> singleList = new ArrayList<Double>();
- Integer index = 0;
- while (tuple2Iterator.hasNext()) {
- Tuple2<Tuple2<Integer, Integer>, Double> partitionTuple = tuple2Iterator.next();
- singleList.add(partitionTuple._2);
- index = partitionTuple._1._2;
- }
- list.add(new Tuple2<Integer, List<Double>>(index, singleList));
- System.out.println(""+list);
- return list;
- }
- });
- //
- // //System.out.println("Number of row partitions: " + partitionedByRowTimeSeries.count());
- //
- // // Starting iterations
- //
- //
- // JavaPairRDD<Integer, Double> sVector = partitionedByColumnTimeSeries.map(new PairFunction<Tuple2<Integer, List<Double>>, Integer, Double>() {
- // @Override
- // public Tuple2<Integer, Double> call(Tuple2<Integer, List<Double>> integerListTuple2) throws Exception {
- // Double sum = 0d;
- // for (Double value : integerListTuple2._2) {
- // sum += value;
- // }
- // System.out.println("sVector Value" + (sum));
- // return new Tuple2<Integer, Double>(integerListTuple2._1, sum);
- // }
- // });
- // //System.out.println("My s vector has size: " + sVector.count());
- //
- // JavaPairRDD<Integer, Double> vVectorRaw = partitionedByRowTimeSeries.cartesian(sVector).map(new PairFunction<Tuple2<Tuple2<Integer, List<Double>>, Tuple2<Integer, Double>>, Integer, Double>() {
- // @Override
- // public Tuple2<Integer, Double> call(Tuple2<Tuple2<Integer, List<Double>>, Tuple2<Integer, Double>> tuple2Tuple2Tuple2) throws Exception {
- // int sVectorIndex = tuple2Tuple2Tuple2._2._1;
- // Double sum = (tuple2Tuple2Tuple2._1._2.get(sVectorIndex) * tuple2Tuple2Tuple2._2._2 - tuple2Tuple2Tuple2._1._2.get(sVectorIndex) * tuple2Tuple2Tuple2._1._2.get(sVectorIndex));
- // return new Tuple2<Integer, Double>(tuple2Tuple2Tuple2._1._1, sum);
- // }
- // });
- //
- // JavaPairRDD<Integer, Double> vVector = vVectorRaw.reduceByKey(new Function2<Double, Double, Double>() {
- // @Override
- // public Double call(Double aDouble, Double aDouble2) throws Exception {
- // System.out.println("vVector Value" + (aDouble + aDouble2));
- // return aDouble + aDouble2;
- // }
- // });
- // //System.out.println("My v vector has size: " + vVector.count());
- JavaPairRDD<Integer, Double> zVector = partitionedByRowTimeSeries.mapToPair(new PairFunction<Tuple2<Integer, List<Double>>, Integer, Double>() {
- @Override
- public Tuple2<Integer, Double> call(Tuple2<Integer, List<Double>> integerListTuple2) throws Exception {
- return new Tuple2<Integer, Double>(integerListTuple2._1,1d);
- }
- });
- JavaPairRDD<Integer,Double> sVector = calculateSVector(partitionedByColumnTimeSeries, zVector);
- //System.out.println("My s vector has size: " + sVector.count());
- JavaPairRDD<Integer,Double> vVector = calculateVVector(partitionedByRowTimeSeries, sVector, zVector);
- //System.out.println("My v vector has size: " + vVector.count());
- zVector = calculateZVector(vVector,zVector);
- sVector = calculateSVector(partitionedByColumnTimeSeries, zVector);
- System.out.println("My s vector has size: " + sVector.count());
- System.exit(0);
- while (true)
- {
- sVector = calculateSVector(partitionedByColumnTimeSeries, zVector);
- //System.out.println("My s vector has size: " + sVector.count());
- vVector = calculateVVector(partitionedByRowTimeSeries, sVector, zVector);
- //System.out.println("My v vector has size: " + vVector.count());
- JavaPairRDD<Integer, Double> newZVector = calculateZVector(vVector, zVector);
- if (zVector==newZVector){
- break;
- } else {
- zVector=newZVector;
- }
- //System.out.println("My z vector has size: " + vVector.count());
- }
- // JavaPairRDD<Integer,Integer> newZVector = vVector.map(new PairFunction<Tuple2<Integer, Double>, Integer, Integer>() {
- // @Override
- // public Tuple2<Integer, Integer> call(Tuple2<Integer, Double> integerDoubleTuple2) throws Exception {
- // return new Tuple2<Integer, Integer>(integerDoubleTuple2._1,);
- // }
- // });
- //
- // JavaPairRDD<Integer,Double> zVector = timeSeries.map(new PairFunction<String, Integer, Double>() {
- // @Override
- // public Tuple2<Integer, Double> call(String s) {
- // return new Tuple2<Integer, Double>(1, 1d);
- // }
- // });
- //
- // JavaPairRDD<Integer,List<Double>> rowGroupedTimeSeries =timeSeriesObservationMatrix.flatMap(new PairFlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Double>, Integer, List<Double>>() {
- // @Override
- // public Iterable<Tuple2<Integer, List<Double>>> call(Tuple2<Tuple2<Integer, Integer>, Double> tuple2DoubleTuple2) throws Exception {
- // return null;
- // }
- // })
- //
- //
- // JavaPairRDD<Integer, Integer> vVector = timeSeriesObservationMatrix.join(sVector).map(new PairFunction<Tuple2<Integer,Tuple2<List<Integer>,Integer>>,Integer,Integer>(){
- // @Override
- // public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<List<Integer>, Integer>> integerTuple2Tuple2) throws Exception {
- // Integer sum = 0;
- // for (Integer value :integerTuple2Tuple2._2._1){
- // sum += value*integerTuple2Tuple2._1;
- // }
- // return new Tuple2<Integer, Integer>(integerTuple2Tuple2._1,sum);
- // }
- // });
- System.exit(0);
- }
- public static JavaPairRDD<Integer, Double> calculateVVector(JavaPairRDD<Integer, List<Double>> partitionedByRowTimeSeries, JavaPairRDD<Integer, Double> sVector, JavaPairRDD<Integer, Double> zVector) {
- JavaPairRDD<Integer, Double> vVectorRaw = partitionedByRowTimeSeries.cartesian(sVector).mapToPair(new PairFunction<Tuple2<Tuple2<Integer, List<Double>>, Tuple2<Integer, Double>>, Integer, Double>() {
- @Override
- public Tuple2<Integer, Double> call(Tuple2<Tuple2<Integer, List<Double>>, Tuple2<Integer, Double>> tuple2Tuple2Tuple2) throws Exception {
- int sVectorIndex = tuple2Tuple2Tuple2._2._1;
- Double sum = (tuple2Tuple2Tuple2._1._2.get(sVectorIndex) * tuple2Tuple2Tuple2._2._2 - tuple2Tuple2Tuple2._1._2.get(sVectorIndex) * tuple2Tuple2Tuple2._1._2.get(sVectorIndex));
- return new Tuple2<Integer, Double>(tuple2Tuple2Tuple2._1._1, sum);
- }
- });
- JavaPairRDD<Integer, Double> vVector = vVectorRaw.reduceByKey(new Function2<Double, Double, Double>() {
- @Override
- public Double call(Double aDouble, Double aDouble2) throws Exception {
- return aDouble + aDouble2;
- }
- });
- return vVector.join(zVector).mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Double, Double>>, Integer, Double>() {
- @Override
- public Tuple2<Integer, Double> call(Tuple2<Integer, Tuple2<Double, Double>> integerTuple2Tuple2) throws Exception {
- System.out.println("V vector sum:"+(integerTuple2Tuple2._2._1 * integerTuple2Tuple2._2._2));
- return new Tuple2<Integer, Double>(integerTuple2Tuple2._1, integerTuple2Tuple2._2._1 * integerTuple2Tuple2._2._2);
- }
- });
- }
- public static JavaPairRDD<Integer, Double> calculateSVector(JavaPairRDD<Integer, List<Double>> partitionedByColumnTimeSeries, JavaPairRDD<Integer, Double> zVector) {
- final Map<Integer,Double> rawZ = zVector.collectAsMap();
- return partitionedByColumnTimeSeries.mapToPair(new PairFunction<Tuple2<Integer, List<Double>>, Integer, Double>() {
- @Override
- public Tuple2<Integer, Double> call(Tuple2<Integer, List<Double>> integerListTuple2) throws Exception {
- Double sum = 0d;
- Integer counter = 0;
- for (Double value : integerListTuple2._2) {
- System.out.println("S vector calc:"+value+"x"+rawZ.get(counter++));
- sum += value * rawZ.get(counter++);
- }
- System.out.println("S vector sum:"+sum);
- return new Tuple2<Integer, Double>(integerListTuple2._1, sum);
- }
- });
- }
- public static JavaPairRDD<Integer, Double> calculateZVector(JavaPairRDD<Integer, Double> vVector,JavaPairRDD<Integer, Double> zVector){
- JavaPairRDD<Integer, Double> zVectorRaw = zVector.join(vVector).mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Double, Double>>, Integer, Double>() {
- @Override
- public Tuple2<Integer, Double> call(Tuple2<Integer, Tuple2<Double, Double>> integerTuple2Tuple2) throws Exception {
- return new Tuple2<Integer, Double>(integerTuple2Tuple2._1, integerTuple2Tuple2._2._1 * integerTuple2Tuple2._2._2);
- }
- });
- final JavaRDD<Pair<Integer,Double>>rawV = zVectorRaw.map(new Function<Tuple2<Integer, Double>, Pair<Integer, Double>>() {
- @Override
- public Pair<Integer, Double> call(Tuple2<Integer, Double> integerDoubleTuple2) throws Exception {
- return new Pair<Integer,Double>(integerDoubleTuple2._1,integerDoubleTuple2._2);
- }
- });
- zVectorRaw.reduce(new Function2<Tuple2<Integer, Double>, Tuple2<Integer, Double>, Tuple2<Integer, Double>>() {
- @Override
- public Tuple2<Integer, Double> call(Tuple2<Integer, Double> integerDoubleTuple2, Tuple2<Integer, Double> integerDoubleTuple22) throws Exception {
- return null;
- }
- });
- final Pair<Integer,Double> minimalV = rawV.reduce(new Function2<Pair<Integer, Double>, Pair<Integer, Double>, Pair<Integer, Double>>() {
- @Override
- public Pair<Integer, Double> call(Pair<Integer, Double> integerDoubleTuple2, Pair<Integer, Double> integerDoubleTuple22) throws Exception {
- if (integerDoubleTuple2.getSecond()>integerDoubleTuple22.getSecond()){
- return integerDoubleTuple22;
- } else {
- return integerDoubleTuple2;
- }
- }
- });
- System.out.println("Minimal: "+minimalV.getSecond());
- if (minimalV.getSecond()>=0){
- return zVector;
- } else {
- return zVector.mapToPair(new PairFunction<Tuple2<Integer, Double>, Integer, Double>() {
- @Override
- public Tuple2<Integer, Double> call(Tuple2<Integer, Double> integerDoubleTuple2) throws Exception {
- if (!integerDoubleTuple2._1.equals(minimalV.getFirst())) {
- System.out.println("ZVector: "+integerDoubleTuple2);
- return integerDoubleTuple2;
- } else {
- System.out.println("ZVector: "+new Tuple2<Integer, Double>(integerDoubleTuple2._1, integerDoubleTuple2._2 * -1));
- return new Tuple2<Integer, Double>(integerDoubleTuple2._1, integerDoubleTuple2._2 * -1);
- }
- }
- });
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment