SHARE
TWEET

Untitled

a guest Jun 1st, 2017 141 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. //
  2.  
  3. class Keeper implements Comparable<Keeper>{
  4.    
  5.     private int n_stations;
  6.     private int n_critic;
  7.     private String timestamp;
  8.     private Double criticalValue;
  9.     private String week, hour;
  10.  
  11.  
  12.     public Keeper(){
  13.         this.n_stations = 0;
  14.         this.n_critic = 0;
  15.         this.criticalValue = 0;
  16.     }
  17.  
  18.     public void incStations(){
  19.         this.n_stations++;
  20.     }
  21.  
  22.     public void incCritics(){
  23.         this.n_critic++;
  24.     }
  25.  
  26.  
  27.     publiv void setNCritic(int n){
  28.         this.n_critic = n;
  29.     }
  30.  
  31.     public int getNCritic(){
  32.         return this.n_critic;
  33.     }
  34.  
  35.     public int getNStations(){
  36.         return this.n_stations;
  37.  
  38.     }
  39.  
  40.     public void storeWeek(String week){
  41.     this.week = week;
  42.     }
  43.  
  44.     public void storeHour(String hour){
  45.     this.hour = hour;
  46.     }
  47.  
  48.     public int compareTo(Key k2){
  49.  
  50.         //first off return highest
  51.         int k2cv = k2.getCriticalValue();
  52.  
  53.         if(this.criticalValue > k2cv)
  54.             return 1;
  55.         elseif(this.criticalValue <k2cv)
  56.             return -1;
  57.         else{
  58.             //same critical value
  59.             int k2hour = k2.getHour();
  60.  
  61.             int thishour = Integer.parseInt(this.hour);
  62.  
  63.             if(k2hour > thishour)
  64.                 return 1;
  65.             else if(k2hour < thishour)
  66.                 return -1;
  67.             else{
  68.                 //same hour
  69.                 //lexicografic order
  70.                 return -1*this.week.compareTo(k2.getWeek()); //-1 since if first date<second date we want 1 which means better, to be chosen
  71.  
  72.  
  73.             }
  74.  
  75.  
  76.         }
  77.  
  78.  
  79.  
  80.     }
  81.  
  82.  
  83.  
  84.  
  85. }
  86.  
  87.  
  88. final int cnt = 0;
  89.  
  90. //FIRST OFF FILTER
  91.  
  92. JavaRDD<String> bikesData = sc.textFile(inputPath);
  93.  
  94. //first off filter out header and invalid lines
  95.  
  96. JavaRDD<String> bikesFiltered = bikesData.filter(
  97.  
  98.     new Function<String, Boolean>(){
  99.  
  100.         public Boolean call(String currentLine){
  101.             if(cnt == 0){
  102.                 cnt++;
  103.                 return false;
  104.             }
  105.            
  106.                 String [] fields = currentLine.split("\t");
  107.                 int freeslots = Integer.parseInt(fields[3]);
  108.                 int usedslots = Integer.parseInt(fields[2]);
  109.                 return (freeslots>0 && usedslots >0);
  110.            
  111.  
  112.         }
  113.  
  114.  
  115.     }
  116.  
  117.  
  118. );
  119.  
  120. JavaPairRDD<String, Keeper> bikeSlots = bikesData.mapToPair(
  121.  
  122.    
  123.     new PairFunction<String, String, Keeper>(){
  124.  
  125.  
  126.         public Tuple2<String, Keeper>(String currentLine){
  127.             //key will be sid+timestamp
  128.             //value will be a Keeper object
  129.             String[] fields = currentLine.split("\t");
  130.             String sid = fields[0];
  131.             String date = fields[1];
  132.             String time = fields[2];
  133.             int freeslot = Integer.parseInt(fields[3]);
  134.             String[] timeParts = time.split(":");
  135.             String weekDay = DateTool.DayOfTheWeek(date);
  136.  
  137.             String key = sid.concat(" ").concat(weekDay).concat(" ").concat(timeParts[0]); //Wed 12
  138.  
  139.             Keeper k = new Keeper();
  140.             k.incStations(); //station will be set to one, new station
  141.             //check if it's either a critic station or not
  142.             if(freeslot == 0)
  143.                 k.incCritics(); //critic station to 1
  144.  
  145.  
  146.             k.storeWeek(weedDay);
  147.             k.storeHour(timeParts[0]);
  148.  
  149.             return new Tuple2<String, Keeper>(key, k);
  150.  
  151.  
  152.  
  153.         }
  154.  
  155.  
  156.     }
  157.        
  158.  
  159.  
  160. );
  161.  
  162. //so now we have Wed S1 Thu S1 Mon S1 Wed S1 and so on
  163. //need to reduceByKey in order to merge the keepers into one
  164. //which will old the criticality value
  165.  
  166. JavaPairRDD<String, Keeper> resumedDate = bikesData.reduceByKey(
  167.  
  168.  
  169.     new Function2<Keeper, Keeper, Keeper>(){
  170.  
  171.  
  172.         public Keeper call(Keeper k1, Keeper k2){
  173.  
  174.             Keeper k = new Keeper();
  175.             int ns1  = k1.getNStations();
  176.             int ns2 = k2.getNStations();
  177.             int cs1 = k1.getNCritic();
  178.             int cs2 = k2.getNCritic();
  179.  
  180.             k.setCritics(cs1+cs2);
  181.             k.setStations(ns1+ns2);
  182.  
  183.             k.storeHour(k1.getHour());
  184.             k.storeWeek(k1.getWeek());
  185.  
  186.         }
  187.  
  188.     }
  189.  
  190.  
  191.  
  192. );
  193.  
  194. //now we have Wed s1 -> Keeper(10 stations 3 critics Hour Week Criticality ?)
  195.  
  196. //compute criticality
  197.  
  198. JavaPairRDD<String, Keeper> criticalSlots = resumedDate.mapValues(
  199.  
  200.  
  201.     new Function<Keeper, Keeper>(){
  202.  
  203.         public Keeper call(Keeper k){
  204.  
  205.                 k.setCriticality(k.getNCritics()/k.getNStations());
  206.                 return k;
  207.         }
  208.  
  209.     }
  210.  
  211.  
  212. );
  213.  
  214.  
  215. //filter under a threshold
  216.  
  217. final Double threshold = Double.parseDouble(args[3]);
  218.  
  219.  
  220. JavaPairRDD<String, Keeper> criticalSlotsFiltered = criticalSlots.filter(
  221.  
  222.  
  223.     new Function<Tuple2<String, Keeper>, Boolean>(){
  224.  
  225.  
  226.         public Boolean call(Tuple2<String, Keeper> k){
  227.  
  228.  
  229.             return k.getCriticality() > threshold;
  230.  
  231.  
  232.         }
  233.     }
  234. );
  235.  
  236.  
  237. //now find the highest critical value for each specific station, independently from the timeslot
  238.  
  239.  
  240. JavaPairRDD<String, Keeper> stationsCritical = criticalSlotsFiltered.mapToPair(
  241.  
  242.  
  243.  
  244.     new PairFunction<Tuple2<String, Keeper>, String, Keeper>(){
  245.  
  246.  
  247.         public Tuple2<String,Keeper> call(Tuple2<String, Keeper> k){
  248.  
  249.             String key = k._1();
  250.  
  251.             String[] fields = key.split(" ");
  252.             //fields[0] store sid
  253.  
  254.  
  255.             return new Tuple2<String, Keeper>(fields[0], k);
  256.  
  257.  
  258.         }
  259.     }
  260.  
  261. );
  262.  
  263.  
  264. //now we have (s1, k1) (s1, k2), (s1, k3) etcetera
  265.  
  266.  
  267. JavaPairRDD<String, Keeper> topCriticalTimeSlots = stationsCritical.reduceByKey(
  268.  
  269.  
  270.     new Function2<Keeper, Keeper, Keeper>(){
  271.  
  272.  
  273.         public Keeper call(Keeper k1, Keeper k2){
  274.  
  275.  
  276.             int ret = k1.compareTo(k2);
  277.  
  278.             if(k1 == 1)
  279.                 return k1;
  280.             else
  281.                 return k2; //it can't happen that compareTo returns 0
  282.  
  283.  
  284.         }
  285.  
  286.  
  287.  
  288.     }
  289.  
  290.  
  291.  
  292.  
  293. );
  294.  
  295.  
  296. //now, for each station we have the timeslot with the most critical value, need to join to get extra info
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top