Advertisement
Guest User

Untitled

a guest
Jun 1st, 2017
204
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
SPARK 5.07 KB | None | 0 0
  1. //
  2.  
  3. class Keeper implements Comparable<Keeper>{
  4.    
  5.     private int n_stations;
  6.     private int n_critic;
  7.     private String timestamp;
  8.     private Double criticalValue;
  9.     private String week, hour;
  10.  
  11.  
  12.     public Keeper(){
  13.         this.n_stations = 0;
  14.         this.n_critic = 0;
  15.         this.criticalValue = 0;
  16.     }
  17.  
  18.     public void incStations(){
  19.         this.n_stations++;
  20.     }
  21.  
  22.     public void incCritics(){
  23.         this.n_critic++;
  24.     }
  25.  
  26.  
  27.     publiv void setNCritic(int n){
  28.         this.n_critic = n;
  29.     }
  30.  
  31.     public int getNCritic(){
  32.         return this.n_critic;
  33.     }
  34.  
  35.     public int getNStations(){
  36.         return this.n_stations;
  37.  
  38.     }
  39.  
  40.     public void storeWeek(String week){
  41.     this.week = week;
  42.     }
  43.  
  44.     public void storeHour(String hour){
  45.     this.hour = hour;
  46.     }
  47.  
  48.     public int compareTo(Key k2){
  49.  
  50.         //first off return highest
  51.         int k2cv = k2.getCriticalValue();
  52.  
  53.         if(this.criticalValue > k2cv)
  54.             return 1;
  55.         elseif(this.criticalValue <k2cv)
  56.             return -1;
  57.         else{
  58.             //same critical value
  59.             int k2hour = k2.getHour();
  60.  
  61.             int thishour = Integer.parseInt(this.hour);
  62.  
  63.             if(k2hour > thishour)
  64.                 return 1;
  65.             else if(k2hour < thishour)
  66.                 return -1;
  67.             else{
  68.                 //same hour
  69.                 //lexicografic order
  70.                 return -1*this.week.compareTo(k2.getWeek()); //-1 since if first date<second date we want 1 which means better, to be chosen
  71.  
  72.  
  73.             }
  74.  
  75.  
  76.         }
  77.  
  78.  
  79.  
  80.     }
  81.  
  82.  
  83.  
  84.  
  85. }
  86.  
  87.  
  88. final int cnt = 0;
  89.  
  90. //FIRST OFF FILTER
  91.  
  92. JavaRDD<String> bikesData = sc.textFile(inputPath);
  93.  
  94. //first off filter out header and invalid lines
  95.  
  96. JavaRDD<String> bikesFiltered = bikesData.filter(
  97.  
  98.     new Function<String, Boolean>(){
  99.  
  100.         public Boolean call(String currentLine){
  101.             if(cnt == 0){
  102.                 cnt++;
  103.                 return false;
  104.             }
  105.            
  106.                 String [] fields = currentLine.split("\t");
  107.                 int freeslots = Integer.parseInt(fields[3]);
  108.                 int usedslots = Integer.parseInt(fields[2]);
  109.                 return (freeslots>0 && usedslots >0);
  110.            
  111.  
  112.         }
  113.  
  114.  
  115.     }
  116.  
  117.  
  118. );
  119.  
  120. JavaPairRDD<String, Keeper> bikeSlots = bikesData.mapToPair(
  121.  
  122.    
  123.     new PairFunction<String, String, Keeper>(){
  124.  
  125.  
  126.         public Tuple2<String, Keeper>(String currentLine){
  127.             //key will be sid+timestamp
  128.             //value will be a Keeper object
  129.             String[] fields = currentLine.split("\t");
  130.             String sid = fields[0];
  131.             String date = fields[1];
  132.             String time = fields[2];
  133.             int freeslot = Integer.parseInt(fields[3]);
  134.             String[] timeParts = time.split(":");
  135.             String weekDay = DateTool.DayOfTheWeek(date);
  136.  
  137.             String key = sid.concat(" ").concat(weekDay).concat(" ").concat(timeParts[0]); //Wed 12
  138.  
  139.             Keeper k = new Keeper();
  140.             k.incStations(); //station will be set to one, new station
  141.             //check if it's either a critic station or not
  142.             if(freeslot == 0)
  143.                 k.incCritics(); //critic station to 1
  144.  
  145.  
  146.             k.storeWeek(weedDay);
  147.             k.storeHour(timeParts[0]);
  148.  
  149.             return new Tuple2<String, Keeper>(key, k);
  150.  
  151.  
  152.  
  153.         }
  154.  
  155.  
  156.     }
  157.        
  158.  
  159.  
  160. );
  161.  
  162. //so now we have Wed S1 Thu S1 Mon S1 Wed S1 and so on
  163. //need to reduceByKey in order to merge the keepers into one
  164. //which will old the criticality value
  165.  
  166. JavaPairRDD<String, Keeper> resumedDate = bikesData.reduceByKey(
  167.  
  168.  
  169.     new Function2<Keeper, Keeper, Keeper>(){
  170.  
  171.  
  172.         public Keeper call(Keeper k1, Keeper k2){
  173.  
  174.             Keeper k = new Keeper();
  175.             int ns1  = k1.getNStations();
  176.             int ns2 = k2.getNStations();
  177.             int cs1 = k1.getNCritic();
  178.             int cs2 = k2.getNCritic();
  179.  
  180.             k.setCritics(cs1+cs2);
  181.             k.setStations(ns1+ns2);
  182.  
  183.             k.storeHour(k1.getHour());
  184.             k.storeWeek(k1.getWeek());
  185.  
  186.         }
  187.  
  188.     }
  189.  
  190.  
  191.  
  192. );
  193.  
  194. //now we have Wed s1 -> Keeper(10 stations 3 critics Hour Week Criticality ?)
  195.  
  196. //compute criticality
  197.  
  198. JavaPairRDD<String, Keeper> criticalSlots = resumedDate.mapValues(
  199.  
  200.  
  201.     new Function<Keeper, Keeper>(){
  202.  
  203.         public Keeper call(Keeper k){
  204.  
  205.                 k.setCriticality(k.getNCritics()/k.getNStations());
  206.                 return k;
  207.         }
  208.  
  209.     }
  210.  
  211.  
  212. );
  213.  
  214.  
  215. //filter under a threshold
  216.  
  217. final Double threshold = Double.parseDouble(args[3]);
  218.  
  219.  
  220. JavaPairRDD<String, Keeper> criticalSlotsFiltered = criticalSlots.filter(
  221.  
  222.  
  223.     new Function<Tuple2<String, Keeper>, Boolean>(){
  224.  
  225.  
  226.         public Boolean call(Tuple2<String, Keeper> k){
  227.  
  228.  
  229.             return k.getCriticality() > threshold;
  230.  
  231.  
  232.         }
  233.     }
  234. );
  235.  
  236.  
  237. //now find the highest critical value for each specific station, independently from the timeslot
  238.  
  239.  
  240. JavaPairRDD<String, Keeper> stationsCritical = criticalSlotsFiltered.mapToPair(
  241.  
  242.  
  243.  
  244.     new PairFunction<Tuple2<String, Keeper>, String, Keeper>(){
  245.  
  246.  
  247.         public Tuple2<String,Keeper> call(Tuple2<String, Keeper> k){
  248.  
  249.             String key = k._1();
  250.  
  251.             String[] fields = key.split(" ");
  252.             //fields[0] store sid
  253.  
  254.  
  255.             return new Tuple2<String, Keeper>(fields[0], k);
  256.  
  257.  
  258.         }
  259.     }
  260.  
  261. );
  262.  
  263.  
  264. //now we have (s1, k1) (s1, k2), (s1, k3) etcetera
  265.  
  266.  
  267. JavaPairRDD<String, Keeper> topCriticalTimeSlots = stationsCritical.reduceByKey(
  268.  
  269.  
  270.     new Function2<Keeper, Keeper, Keeper>(){
  271.  
  272.  
  273.         public Keeper call(Keeper k1, Keeper k2){
  274.  
  275.  
  276.             int ret = k1.compareTo(k2);
  277.  
  278.             if(k1 == 1)
  279.                 return k1;
  280.             else
  281.                 return k2; //it can't happen that compareTo returns 0
  282.  
  283.  
  284.         }
  285.  
  286.  
  287.  
  288.     }
  289.  
  290.  
  291.  
  292.  
  293. );
  294.  
  295.  
  296. //now, for each station we have the timeslot with the most critical value, need to join to get extra info
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement