Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- //
- class Keeper implements Comparable<Keeper>{
- private int n_stations;
- private int n_critic;
- private String timestamp;
- private Double criticalValue;
- private String week, hour;
- public Keeper(){
- this.n_stations = 0;
- this.n_critic = 0;
- this.criticalValue = 0;
- }
- public void incStations(){
- this.n_stations++;
- }
- public void incCritics(){
- this.n_critic++;
- }
- publiv void setNCritic(int n){
- this.n_critic = n;
- }
- public int getNCritic(){
- return this.n_critic;
- }
- public int getNStations(){
- return this.n_stations;
- }
- public void storeWeek(String week){
- this.week = week;
- }
- public void storeHour(String hour){
- this.hour = hour;
- }
- public int compareTo(Key k2){
- //first off return highest
- int k2cv = k2.getCriticalValue();
- if(this.criticalValue > k2cv)
- return 1;
- elseif(this.criticalValue <k2cv)
- return -1;
- else{
- //same critical value
- int k2hour = k2.getHour();
- int thishour = Integer.parseInt(this.hour);
- if(k2hour > thishour)
- return 1;
- else if(k2hour < thishour)
- return -1;
- else{
- //same hour
- //lexicografic order
- return -1*this.week.compareTo(k2.getWeek()); //-1 since if first date<second date we want 1 which means better, to be chosen
- }
- }
- }
- }
- final int cnt = 0;
- //FIRST OFF FILTER
- JavaRDD<String> bikesData = sc.textFile(inputPath);
- //first off filter out header and invalid lines
- JavaRDD<String> bikesFiltered = bikesData.filter(
- new Function<String, Boolean>(){
- public Boolean call(String currentLine){
- if(cnt == 0){
- cnt++;
- return false;
- }
- String [] fields = currentLine.split("\t");
- int freeslots = Integer.parseInt(fields[3]);
- int usedslots = Integer.parseInt(fields[2]);
- return (freeslots>0 && usedslots >0);
- }
- }
- );
- JavaPairRDD<String, Keeper> bikeSlots = bikesData.mapToPair(
- new PairFunction<String, String, Keeper>(){
- public Tuple2<String, Keeper>(String currentLine){
- //key will be sid+timestamp
- //value will be a Keeper object
- String[] fields = currentLine.split("\t");
- String sid = fields[0];
- String date = fields[1];
- String time = fields[2];
- int freeslot = Integer.parseInt(fields[3]);
- String[] timeParts = time.split(":");
- String weekDay = DateTool.DayOfTheWeek(date);
- String key = sid.concat(" ").concat(weekDay).concat(" ").concat(timeParts[0]); //Wed 12
- Keeper k = new Keeper();
- k.incStations(); //station will be set to one, new station
- //check if it's either a critic station or not
- if(freeslot == 0)
- k.incCritics(); //critic station to 1
- k.storeWeek(weedDay);
- k.storeHour(timeParts[0]);
- return new Tuple2<String, Keeper>(key, k);
- }
- }
- );
- //so now we have Wed S1 Thu S1 Mon S1 Wed S1 and so on
- //need to reduceByKey in order to merge the keepers into one
- //which will old the criticality value
- JavaPairRDD<String, Keeper> resumedDate = bikesData.reduceByKey(
- new Function2<Keeper, Keeper, Keeper>(){
- public Keeper call(Keeper k1, Keeper k2){
- Keeper k = new Keeper();
- int ns1 = k1.getNStations();
- int ns2 = k2.getNStations();
- int cs1 = k1.getNCritic();
- int cs2 = k2.getNCritic();
- k.setCritics(cs1+cs2);
- k.setStations(ns1+ns2);
- k.storeHour(k1.getHour());
- k.storeWeek(k1.getWeek());
- }
- }
- );
- //now we have Wed s1 -> Keeper(10 stations 3 critics Hour Week Criticality ?)
- //compute criticality
- JavaPairRDD<String, Keeper> criticalSlots = resumedDate.mapValues(
- new Function<Keeper, Keeper>(){
- public Keeper call(Keeper k){
- k.setCriticality(k.getNCritics()/k.getNStations());
- return k;
- }
- }
- );
- //filter under a threshold
- final Double threshold = Double.parseDouble(args[3]);
- JavaPairRDD<String, Keeper> criticalSlotsFiltered = criticalSlots.filter(
- new Function<Tuple2<String, Keeper>, Boolean>(){
- public Boolean call(Tuple2<String, Keeper> k){
- return k.getCriticality() > threshold;
- }
- }
- );
- //now find the highest critical value for each specific station, independently from the timeslot
- JavaPairRDD<String, Keeper> stationsCritical = criticalSlotsFiltered.mapToPair(
- new PairFunction<Tuple2<String, Keeper>, String, Keeper>(){
- public Tuple2<String,Keeper> call(Tuple2<String, Keeper> k){
- String key = k._1();
- String[] fields = key.split(" ");
- //fields[0] store sid
- return new Tuple2<String, Keeper>(fields[0], k);
- }
- }
- );
- //now we have (s1, k1) (s1, k2), (s1, k3) etcetera
- JavaPairRDD<String, Keeper> topCriticalTimeSlots = stationsCritical.reduceByKey(
- new Function2<Keeper, Keeper, Keeper>(){
- public Keeper call(Keeper k1, Keeper k2){
- int ret = k1.compareTo(k2);
- if(k1 == 1)
- return k1;
- else
- return k2; //it can't happen that compareTo returns 0
- }
- }
- );
- //now, for each station we have the timeslot with the most critical value, need to join to get extra info
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement