Guest User

Untitled

a guest
May 12th, 2014
266
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 17.11 KB | None | 0 0
  1. package main;
  2.  
  3. import java.util.ArrayList;
  4. import java.util.Iterator;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.regex.Pattern;
  8.  
  9. import org.apache.commons.math3.util.Pair;
  10. import org.apache.spark.Partitioner;
  11. import org.apache.spark.api.java.JavaPairRDD;
  12. import org.apache.spark.api.java.JavaRDD;
  13. import org.apache.spark.api.java.JavaSparkContext;
  14. import org.apache.spark.api.java.function.Function;
  15. import org.apache.spark.api.java.function.Function2;
  16. import org.apache.spark.api.java.function.PairFlatMapFunction;
  17. import org.apache.spark.api.java.function.PairFunction;
  18.  
  19. import scala.Tuple2;
  20.  
  21.  
  22.  
  23. /**
  24.  * Created by ale on 07/04/14.
  25.  */
  26. public class SCD {
  27.     private static final Pattern SPACE = Pattern.compile(" ");
  28.  
  29.     public static void main(String[] args) throws Exception {
  30.         if (args.length < 2) {
  31.             System.err.println("Usage: WordCount <master> <file>");
  32.             System.exit(1);
  33.         }
  34.  
  35.         JavaSparkContext ctx = new JavaSparkContext(args[0], "SCD",
  36.                 System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(SCD.class));
  37.  
  38.  
  39.         JavaRDD<String> timeSeries = ctx.textFile(args[1], 1);
  40.         // Assumption, file in this format:
  41.         // columnIndex rowIndex value
  42.         // 0 0 1.3
  43.         // 0 1 1.1
  44.         // 1 0 2.2
  45.         // 1 1 3.4
  46.         JavaPairRDD<Tuple2<Integer, Integer>, Double> timeSeriesObservationMatrix = timeSeries.mapToPair(new PairFunction<String, Tuple2<Integer, Integer>, Double>() {
  47.             @Override
  48.             public Tuple2<Tuple2<Integer, Integer>, Double> call(String s) {
  49.                 return new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(Integer.parseInt(SPACE.split(s)[0]), Integer.parseInt(SPACE.split(s)[1])), Double.parseDouble(SPACE.split(s)[2]));
  50.             }
  51.         });
  52.  
  53.  
  54.         JavaRDD<Integer> columnIndices = timeSeriesObservationMatrix.map(new Function<Tuple2<Tuple2<Integer, Integer>, Double>, Integer>() {
  55.             @Override
  56.             public Integer call(Tuple2<Tuple2<Integer, Integer>, Double> tuple2DoubleTuple2) throws Exception {
  57.                 return tuple2DoubleTuple2._1._1;
  58.             }
  59.         });
  60.  
  61.         JavaRDD<Integer> rowIndices = timeSeriesObservationMatrix.map(new Function<Tuple2<Tuple2<Integer, Integer>, Double>, Integer>() {
  62.             @Override
  63.             public Integer call(Tuple2<Tuple2<Integer, Integer>, Double> tuple2DoubleTuple2) throws Exception {
  64.                 return tuple2DoubleTuple2._1._2;
  65.             }
  66.         });
  67.  
  68.         final int maxColumnIndex = columnIndices.reduce(new Function2<Integer, Integer, Integer>() {
  69.             @Override
  70.             public Integer call(Integer integer, Integer integer2) throws Exception {
  71.  
  72.                 return Math.max(integer, integer2);
  73.             }
  74.         });
  75.  
  76.         final int maxRowIndex = rowIndices.reduce(new Function2<Integer, Integer, Integer>() {
  77.             @Override
  78.             public Integer call(Integer integer, Integer integer2) throws Exception {
  79.  
  80.                 return Math.max(integer, integer2);
  81.             }
  82.         });
  83.   //      System.out.println(maxColumnIndex + "x" + maxRowIndex + " matrix size:" + timeSeriesObservationMatrix.count());
  84.  
  85.  
  86.         JavaPairRDD<Integer, List<Double>> partitionedByColumnTimeSeries = timeSeriesObservationMatrix.partitionBy(new Partitioner() {
  87.             @Override
  88.             public int numPartitions() {
  89.                 return maxColumnIndex + 1;
  90.             }
  91.  
  92.             @Override
  93.             public int getPartition(Object o) {
  94. //                System.out.println("My object:" + o);
  95.                 return ((Tuple2<Integer, Integer>) o)._1;
  96.             }
  97.         }).mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<Tuple2<Integer, Integer>, Double>>, Integer, List<Double>>() {
  98.             @Override
  99.             public Iterable<Tuple2<Integer, List<Double>>> call(Iterator<Tuple2<Tuple2<Integer, Integer>, Double>> tuple2Iterator) throws Exception {
  100.                 List<Tuple2<Integer, List<Double>>> list = new ArrayList<Tuple2<Integer, List<Double>>>();
  101.                 List<Double> singleList = new ArrayList<Double>();
  102.  
  103.                 Integer index = 0;
  104.                 while (tuple2Iterator.hasNext()) {
  105.                     Tuple2<Tuple2<Integer, Integer>, Double> partitionTuple = tuple2Iterator.next();
  106.                     singleList.add(partitionTuple._2);
  107.                     index = partitionTuple._1._1;
  108.                 }
  109.                 list.add(new Tuple2<Integer, List<Double>>(index, singleList));
  110.                 return list;
  111.             }
  112.         });
  113.  
  114.         //System.out.println("Number of column partitions: " + partitionedByColumnTimeSeries.count());
  115.  
  116.         JavaPairRDD<Integer, List<Double>> partitionedByRowTimeSeries = timeSeriesObservationMatrix.partitionBy(new Partitioner() {
  117.             @Override
  118.             public int numPartitions() {
  119.                 return maxRowIndex + 1;
  120.             }
  121.  
  122.             @Override
  123.             public int getPartition(Object o) {
  124.                 return ((Tuple2<Integer, Integer>) o)._2;
  125.             }
  126.         }).mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<Tuple2<Integer, Integer>, Double>>, Integer, List<Double>>() {
  127.             @Override
  128.             public Iterable<Tuple2<Integer, List<Double>>> call(Iterator<Tuple2<Tuple2<Integer, Integer>, Double>> tuple2Iterator) throws Exception {
  129.                 List<Tuple2<Integer, List<Double>>> list = new ArrayList<Tuple2<Integer, List<Double>>>();
  130.                 List<Double> singleList = new ArrayList<Double>();
  131.  
  132.                 Integer index = 0;
  133.                 while (tuple2Iterator.hasNext()) {
  134.                     Tuple2<Tuple2<Integer, Integer>, Double> partitionTuple = tuple2Iterator.next();
  135.                     singleList.add(partitionTuple._2);
  136.                     index = partitionTuple._1._2;
  137.                 }
  138.                 list.add(new Tuple2<Integer, List<Double>>(index, singleList));
  139.                 System.out.println(""+list);
  140.                 return list;
  141.             }
  142.         });
  143. //
  144. //        //System.out.println("Number of row partitions: " + partitionedByRowTimeSeries.count());
  145. //
  146. //        // Starting iterations
  147. //
  148. //
  149. //        JavaPairRDD<Integer, Double> sVector = partitionedByColumnTimeSeries.map(new PairFunction<Tuple2<Integer, List<Double>>, Integer, Double>() {
  150. //            @Override
  151. //            public Tuple2<Integer, Double> call(Tuple2<Integer, List<Double>> integerListTuple2) throws Exception {
  152. //                Double sum = 0d;
  153. //                for (Double value : integerListTuple2._2) {
  154. //                    sum += value;
  155. //                }
  156. //                System.out.println("sVector Value" + (sum));
  157. //                return new Tuple2<Integer, Double>(integerListTuple2._1, sum);
  158. //            }
  159. //        });
  160. //        //System.out.println("My s vector has size: " + sVector.count());
  161. //
  162. //        JavaPairRDD<Integer, Double> vVectorRaw = partitionedByRowTimeSeries.cartesian(sVector).map(new PairFunction<Tuple2<Tuple2<Integer, List<Double>>, Tuple2<Integer, Double>>, Integer, Double>() {
  163. //            @Override
  164. //            public Tuple2<Integer, Double> call(Tuple2<Tuple2<Integer, List<Double>>, Tuple2<Integer, Double>> tuple2Tuple2Tuple2) throws Exception {
  165. //                int sVectorIndex = tuple2Tuple2Tuple2._2._1;
  166. //                Double sum = (tuple2Tuple2Tuple2._1._2.get(sVectorIndex) * tuple2Tuple2Tuple2._2._2 - tuple2Tuple2Tuple2._1._2.get(sVectorIndex) * tuple2Tuple2Tuple2._1._2.get(sVectorIndex));
  167. //                return new Tuple2<Integer, Double>(tuple2Tuple2Tuple2._1._1, sum);
  168. //            }
  169. //        });
  170. //
  171. //        JavaPairRDD<Integer, Double> vVector = vVectorRaw.reduceByKey(new Function2<Double, Double, Double>() {
  172. //            @Override
  173. //            public Double call(Double aDouble, Double aDouble2) throws Exception {
  174. //                System.out.println("vVector Value" + (aDouble + aDouble2));
  175. //                return aDouble + aDouble2;
  176. //            }
  177. //        });
  178. //        //System.out.println("My v vector has size: " + vVector.count());
  179.  
  180.  
  181.         JavaPairRDD<Integer, Double> zVector = partitionedByRowTimeSeries.mapToPair(new PairFunction<Tuple2<Integer, List<Double>>, Integer, Double>() {
  182.             @Override
  183.             public Tuple2<Integer, Double> call(Tuple2<Integer, List<Double>> integerListTuple2) throws Exception {
  184.                 return new Tuple2<Integer, Double>(integerListTuple2._1,1d);
  185.             }
  186.         });
  187.  
  188.         JavaPairRDD<Integer,Double> sVector = calculateSVector(partitionedByColumnTimeSeries, zVector);
  189.         //System.out.println("My s vector has size: " + sVector.count());
  190.  
  191.         JavaPairRDD<Integer,Double> vVector = calculateVVector(partitionedByRowTimeSeries, sVector, zVector);
  192.         //System.out.println("My v vector has size: " + vVector.count());
  193.         zVector = calculateZVector(vVector,zVector);
  194.  
  195.         sVector = calculateSVector(partitionedByColumnTimeSeries, zVector);
  196.  
  197.         System.out.println("My s vector has size: " + sVector.count());
  198.         System.exit(0);
  199.         while (true)
  200.         {
  201.             sVector = calculateSVector(partitionedByColumnTimeSeries, zVector);
  202.             //System.out.println("My s vector has size: " + sVector.count());
  203.  
  204.             vVector = calculateVVector(partitionedByRowTimeSeries, sVector, zVector);
  205.             //System.out.println("My v vector has size: " + vVector.count());
  206.  
  207.             JavaPairRDD<Integer, Double> newZVector = calculateZVector(vVector, zVector);
  208.             if (zVector==newZVector){
  209.                 break;
  210.             } else {
  211.                 zVector=newZVector;
  212.             }
  213.             //System.out.println("My z vector has size: " + vVector.count());
  214.         }
  215.  
  216.  
  217.  
  218. //        JavaPairRDD<Integer,Integer> newZVector = vVector.map(new PairFunction<Tuple2<Integer, Double>, Integer, Integer>() {
  219. //            @Override
  220. //            public Tuple2<Integer, Integer> call(Tuple2<Integer, Double> integerDoubleTuple2) throws Exception {
  221. //                return new Tuple2<Integer, Integer>(integerDoubleTuple2._1,);
  222. //            }
  223. //        });
  224.  
  225.  
  226. //
  227. //        JavaPairRDD<Integer,Double> zVector = timeSeries.map(new PairFunction<String, Integer, Double>() {
  228. //            @Override
  229. //            public Tuple2<Integer, Double> call(String s) {
  230. //                return new Tuple2<Integer, Double>(1, 1d);
  231. //            }
  232. //        });
  233. //
  234. //        JavaPairRDD<Integer,List<Double>> rowGroupedTimeSeries =timeSeriesObservationMatrix.flatMap(new PairFlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Double>, Integer, List<Double>>() {
  235. //            @Override
  236. //            public Iterable<Tuple2<Integer, List<Double>>> call(Tuple2<Tuple2<Integer, Integer>, Double> tuple2DoubleTuple2) throws Exception {
  237. //                return null;
  238. //            }
  239. //        })
  240.  
  241.  
  242. //
  243. //
  244. //        JavaPairRDD<Integer, Integer> vVector = timeSeriesObservationMatrix.join(sVector).map(new PairFunction<Tuple2<Integer,Tuple2<List<Integer>,Integer>>,Integer,Integer>(){
  245. //            @Override
  246. //            public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<List<Integer>, Integer>> integerTuple2Tuple2) throws Exception {
  247. //                Integer sum = 0;
  248. //                for (Integer value :integerTuple2Tuple2._2._1){
  249. //                    sum += value*integerTuple2Tuple2._1;
  250. //                }
  251. //                return new Tuple2<Integer, Integer>(integerTuple2Tuple2._1,sum);
  252. //            }
  253. //        });
  254.  
  255.  
  256.         System.exit(0);
  257.     }
  258.  
  259.     public static JavaPairRDD<Integer, Double> calculateVVector(JavaPairRDD<Integer, List<Double>> partitionedByRowTimeSeries, JavaPairRDD<Integer, Double> sVector, JavaPairRDD<Integer, Double> zVector) {
  260.  
  261.         JavaPairRDD<Integer, Double> vVectorRaw = partitionedByRowTimeSeries.cartesian(sVector).mapToPair(new PairFunction<Tuple2<Tuple2<Integer, List<Double>>, Tuple2<Integer, Double>>, Integer, Double>() {
  262.             @Override
  263.             public Tuple2<Integer, Double> call(Tuple2<Tuple2<Integer, List<Double>>, Tuple2<Integer, Double>> tuple2Tuple2Tuple2) throws Exception {
  264.                 int sVectorIndex = tuple2Tuple2Tuple2._2._1;
  265.                 Double sum = (tuple2Tuple2Tuple2._1._2.get(sVectorIndex) * tuple2Tuple2Tuple2._2._2 - tuple2Tuple2Tuple2._1._2.get(sVectorIndex) * tuple2Tuple2Tuple2._1._2.get(sVectorIndex));
  266.                 return new Tuple2<Integer, Double>(tuple2Tuple2Tuple2._1._1, sum);
  267.             }
  268.         });
  269.  
  270.         JavaPairRDD<Integer, Double> vVector = vVectorRaw.reduceByKey(new Function2<Double, Double, Double>() {
  271.             @Override
  272.             public Double call(Double aDouble, Double aDouble2) throws Exception {
  273.                 return aDouble + aDouble2;
  274.             }
  275.         });
  276.  
  277.         return vVector.join(zVector).mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Double, Double>>, Integer, Double>() {
  278.             @Override
  279.             public Tuple2<Integer, Double> call(Tuple2<Integer, Tuple2<Double, Double>> integerTuple2Tuple2) throws Exception {
  280.                 System.out.println("V vector sum:"+(integerTuple2Tuple2._2._1 * integerTuple2Tuple2._2._2));
  281.                 return new Tuple2<Integer, Double>(integerTuple2Tuple2._1, integerTuple2Tuple2._2._1 * integerTuple2Tuple2._2._2);
  282.             }
  283.         });
  284.     }
  285.  
  286.     public static JavaPairRDD<Integer, Double> calculateSVector(JavaPairRDD<Integer, List<Double>> partitionedByColumnTimeSeries, JavaPairRDD<Integer, Double> zVector) {
  287.         final Map<Integer,Double> rawZ = zVector.collectAsMap();
  288.         return partitionedByColumnTimeSeries.mapToPair(new PairFunction<Tuple2<Integer, List<Double>>, Integer, Double>() {
  289.             @Override
  290.             public Tuple2<Integer, Double> call(Tuple2<Integer, List<Double>> integerListTuple2) throws Exception {
  291.                 Double sum = 0d;
  292.                 Integer counter = 0;
  293.  
  294.                 for (Double value : integerListTuple2._2) {
  295.                     System.out.println("S vector calc:"+value+"x"+rawZ.get(counter++));
  296.                     sum += value * rawZ.get(counter++);
  297.                 }
  298.  
  299.                 System.out.println("S vector sum:"+sum);
  300.                 return new Tuple2<Integer, Double>(integerListTuple2._1, sum);
  301.             }
  302.         });
  303.     }
  304.  
  305.  
  306.     public static JavaPairRDD<Integer, Double> calculateZVector(JavaPairRDD<Integer, Double> vVector,JavaPairRDD<Integer, Double> zVector){
  307.         JavaPairRDD<Integer, Double> zVectorRaw = zVector.join(vVector).mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Double, Double>>, Integer, Double>() {
  308.             @Override
  309.             public Tuple2<Integer, Double> call(Tuple2<Integer, Tuple2<Double, Double>> integerTuple2Tuple2) throws Exception {
  310.                 return new Tuple2<Integer, Double>(integerTuple2Tuple2._1, integerTuple2Tuple2._2._1 * integerTuple2Tuple2._2._2);
  311.  
  312.             }
  313.         });
  314.  
  315.         final JavaRDD<Pair<Integer,Double>>rawV = zVectorRaw.map(new Function<Tuple2<Integer, Double>, Pair<Integer, Double>>() {
  316.             @Override
  317.             public Pair<Integer, Double> call(Tuple2<Integer, Double> integerDoubleTuple2) throws Exception {
  318.                 return new Pair<Integer,Double>(integerDoubleTuple2._1,integerDoubleTuple2._2);
  319.             }
  320.         });
  321.  
  322.         zVectorRaw.reduce(new Function2<Tuple2<Integer, Double>, Tuple2<Integer, Double>, Tuple2<Integer, Double>>() {
  323.             @Override
  324.             public Tuple2<Integer, Double> call(Tuple2<Integer, Double> integerDoubleTuple2, Tuple2<Integer, Double> integerDoubleTuple22) throws Exception {
  325.                 return null;
  326.             }
  327.         });
  328.  
  329.         final Pair<Integer,Double> minimalV =  rawV.reduce(new Function2<Pair<Integer, Double>, Pair<Integer, Double>, Pair<Integer, Double>>() {
  330.             @Override
  331.             public Pair<Integer, Double> call(Pair<Integer, Double> integerDoubleTuple2, Pair<Integer, Double> integerDoubleTuple22) throws Exception {
  332.                 if (integerDoubleTuple2.getSecond()>integerDoubleTuple22.getSecond()){
  333.                     return integerDoubleTuple22;
  334.                 } else {
  335.                     return integerDoubleTuple2;
  336.                 }
  337.             }
  338.         });
  339.  
  340.         System.out.println("Minimal: "+minimalV.getSecond());
  341.  
  342.         if (minimalV.getSecond()>=0){
  343.             return zVector;
  344.         } else {
  345.             return zVector.mapToPair(new PairFunction<Tuple2<Integer, Double>, Integer, Double>() {
  346.                 @Override
  347.                 public Tuple2<Integer, Double> call(Tuple2<Integer, Double> integerDoubleTuple2) throws Exception {
  348.                     if (!integerDoubleTuple2._1.equals(minimalV.getFirst())) {
  349.                         System.out.println("ZVector: "+integerDoubleTuple2);
  350.                         return integerDoubleTuple2;
  351.                     } else {
  352.                         System.out.println("ZVector: "+new Tuple2<Integer, Double>(integerDoubleTuple2._1, integerDoubleTuple2._2 * -1));
  353.                         return new Tuple2<Integer, Double>(integerDoubleTuple2._1, integerDoubleTuple2._2 * -1);
  354.                     }
  355.                 }
  356.             });
  357.         }
  358.     }
  359. }
Advertisement
Add Comment
Please, Sign In to add comment