Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package analysis;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.api.java.function.PairFlatMapFunction;
- import org.apache.hadoop.io.Text;
- import org.apache.commons.collections4.bidimap.TreeBidiMap;
- import java.util.Iterator;
- import java.util.TreeSet;
- import java.util.List;
- import java.util.ArrayList;
- import java.util.LinkedList;
- import java.util.HashMap;
- import com.google.common.base.Optional;
- import scala.Tuple2;
- public class Functions {
- // CLEAN UP PROCESS:
- // TODO: add a java concurrent? counter
- // TODO: option1: time based the time of last cleanup
- // option2: size based how many lines have been processed.
- // TODO: check the memory threshold
- private static final Long RECENCY = 10000L;
- private static final Long MAXSIZE = 1000L;
- // convets the input to a key/value pair with the regular key as the key and
- // Operation as the value
- public static PairFunction<String, String, Operation> clusterOperations = new PairFunction<String, String, Operation>() {
- @Override
- public Tuple2<String, Operation> call(String record) throws Exception {
- // Everytime processing one line, the atomic counter will increment
- String[] temp = record.split("\t");
- return new Tuple2<String, Operation>(temp[0], new Operation(
- temp[0], temp[1], Long.parseLong(temp[2]),
- Long.parseLong(temp[3]), Integer.parseInt(temp[4])));
- }
- };
- /*
- * Takes the operations which are grouped by their keys and adds them to
- * their respective zones a TreeBidiMap is used to store the zones which map
- * to their values. This Maps's keys are the zones and the values are the
- * value for that zone. This structure allows us to search for the zone in
- * the structure by doing a reverse look-up with the value.
- *
- * Since we can look up the values, this structure has more expensive add
- * and remove operations than a regular TreeMap. We use an additional
- * HashMap to store the zones that are being continuously updated so we
- * don't have to keep looking into the TreeBidiMap.
- */
- public static Function2<List<Operation>, Optional<TreeBidiMap<Zone, String>>, Optional<TreeBidiMap<Zone, String>>> updateZones = new Function2<List<Operation>, Optional<TreeBidiMap<Zone, String>>, Optional<TreeBidiMap<Zone, String>>>() {
- @Override
- public Optional<TreeBidiMap<Zone, String>> call(List<Operation> newOps,
- Optional<TreeBidiMap<Zone, String>> prevTree) throws Exception {
- TreeBidiMap<Zone, String> updatedTree = prevTree.orNull();
- if (updatedTree == null) {
- updatedTree = new TreeBidiMap();
- }
- updateZoneSet(newOps, updatedTree);
- return Optional.of(updatedTree);
- }
- };
- public static void updateZoneSet(List<Operation> newOps,
- TreeBidiMap<Zone, String> updatedTree) {
- HashMap<String, Zone> updatedZones = new HashMap<>();
- // add the operations into their respective zones
- for (Operation op : newOps) {
- String value = op.getValue();
- Zone z = updatedZones.get(value);
- if (z == null) {
- z = updatedTree.removeValue(value);
- if (z == null) {
- z = new Zone(op.getKey(), value);
- }
- updatedZones.put(value, z);
- }
- z.addOperation(op);
- }
- //................Removing zones based on recency........................
- //While adding we check for the recency of the zone and
- //add if they are recent based on a threshold
- for (Zone z : updatedZones.values()) {
- if (z != null && (Math.abs(z.getStart() - z.getFinish()) < RECENCY)) {
- updatedTree.put(z, z.getValue());
- }
- }
- //................Removing based on size................................
- // APPROACHES-
- //1) If (size is greater than a particular value),
- // iterate over the map removing the least recent zones until size is allowable
- //2) We modify the CompareTo function for zones such that it is based on recency.
- // That will ensure that our TreeBidiMap is sorted based on recency
- // whenever we go over the threshold size, we delete the required number of least recent zones
- /*Implementing approach 1.
- create an array of the number of zones that need to be removed.
- To find the zones that are least recent, iterate over the map and keep track of least recent zones
- This will give you an array of least recent zones which you can then remove from the map
- */
- int zoneTreeSize = updatedTree.size();
- if (zoneTreeSize > MAXSIZE){
- Zone[] leastRecentZones = new Zone[zoneTreeSize-MAXSIZE];
- int insertCounter = 0;
- for (Zone z:updatedTree.keySet()){
- //Add first n (zoneTreeSize-MAXSIZE) zones in the array - Initializing the leastRecentZones array
- if(insertCounter < zoneTreeSize-MAXSIZE){
- leastRecentZones[insertCounter] = z;
- insertCounter++;
- }
- //iterate and find if current zone is one of least recent zones
- for (int j=0; j < zoneTreeSize-MAXSIZE; j++)
- {
- if (Math.abs(z.getFinish() - z.getStart()) >
- Math.abs(leastRecentZones[i].getFinish() - leastRecentZones[i].getStart())){
- leastRecentZones[i] = z;
- break;
- }
- }
- }
- //Remove least recent zones from the map.
- for (int j=0; j<zoneTreeSize-MAXSIZE;j++)
- updatedTree.remove(leastRecentZones[j];
- }
- Long maxRight = 0L;
- for (Zone z : updatedTree.keySet()) {
- if (z != null) {
- z.setMaxRightEarlierZones(maxRight);
- maxRight = Math.max(maxRight, z.getRight());
- }
- }
- }
- // takes the scores for each zone and groups them by their keys
- public static PairFunction<Tuple2<Zone, Long[]>, String, Long> groupScoresByKey = new PairFunction<Tuple2<Zone, Long[]>, String, Long>() {
- @Override
- public Tuple2<String, Long> call(Tuple2<Zone, Long[]> scoreTuple)
- throws Exception {
- return new Tuple2<String, Long>(scoreTuple._1().getKey(),
- scoreTuple._2()[1]);
- }
- };
- // reduce function that returns max score
- public static Function2<Long, Long, Long> getMaxScore = new Function2<Long, Long, Long>() {
- @Override
- public Long call(Long score1, Long score2) throws Exception {
- return Math.max(score1, score2);
- }
- };
- // iterates through the set of zones and computes gamma score for each zone
- public static PairFlatMapFunction<Tuple2<String, TreeBidiMap<Zone, String>>, Zone, Long[]> calculateGammaPerZone = new PairFlatMapFunction<Tuple2<String, TreeBidiMap<Zone, String>>, Zone, Long[]>() {
- @Override
- public Iterable<Tuple2<Zone, Long[]>> call(
- Tuple2<String, TreeBidiMap<Zone, String>> zoneTree)
- throws Exception {
- List<Tuple2<Zone, Long[]>> scoreList = new ArrayList<Tuple2<Zone, Long[]>>();
- TreeSet<Zone> zones = new TreeSet(zoneTree._2().keySet());
- for (Zone zone : zones) {
- Long[] score = new Long[2];
- score = ZoneAnalysis.compareWithZones(zone, zones, false, 0L);
- if (score.length > 0) {
- scoreList.add(new Tuple2<Zone, Long[]>(zone, score));
- }
- }
- return scoreList;
- }
- };
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement