Advertisement
Guest User

BasedOnSizeAndRecency

a guest
Mar 26th, 2015
245
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.99 KB | None | 0 0
  1. package analysis;
  2.  
  3. import org.apache.spark.api.java.function.Function;
  4. import org.apache.spark.api.java.function.Function2;
  5. import org.apache.spark.api.java.function.PairFunction;
  6. import org.apache.spark.api.java.function.PairFlatMapFunction;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.commons.collections4.bidimap.TreeBidiMap;
  9.  
  10. import java.util.Iterator;
  11. import java.util.TreeSet;
  12. import java.util.List;
  13. import java.util.ArrayList;
  14. import java.util.LinkedList;
  15. import java.util.HashMap;
  16.  
  17. import com.google.common.base.Optional;
  18.  
  19. import scala.Tuple2;
  20.  
  21. public class Functions {
  22.  
  23. // CLEAN UP PROCESS:
  24. // TODO: add a java concurrent? counter
  25. // TODO: option1: time based the time of last cleanup
  26. // option2: size based how many lines have been processed.
  27. // TODO: check the memory threshold
  28. private static final Long RECENCY = 10000L;
  29. private static final Long MAXSIZE = 1000L;
  30. // convets the input to a key/value pair with the regular key as the key and
  31. // Operation as the value
  32. public static PairFunction<String, String, Operation> clusterOperations = new PairFunction<String, String, Operation>() {
  33. @Override
  34. public Tuple2<String, Operation> call(String record) throws Exception {
  35. // Everytime processing one line, the atomic counter will increment
  36. String[] temp = record.split("\t");
  37. return new Tuple2<String, Operation>(temp[0], new Operation(
  38. temp[0], temp[1], Long.parseLong(temp[2]),
  39. Long.parseLong(temp[3]), Integer.parseInt(temp[4])));
  40. }
  41. };
  42.  
  43. /*
  44. * Takes the operations which are grouped by their keys and adds them to
  45. * their respective zones a TreeBidiMap is used to store the zones which map
  46. * to their values. This Maps's keys are the zones and the values are the
  47. * value for that zone. This structure allows us to search for the zone in
  48. * the structure by doing a reverse look-up with the value.
  49. *
  50. * Since we can look up the values, this structure has more expensive add
  51. * and remove operations than a regular TreeMap. We use an additional
  52. * HashMap to store the zones that are being continuously updated so we
  53. * don't have to keep looking into the TreeBidiMap.
  54. */
  55. public static Function2<List<Operation>, Optional<TreeBidiMap<Zone, String>>, Optional<TreeBidiMap<Zone, String>>> updateZones = new Function2<List<Operation>, Optional<TreeBidiMap<Zone, String>>, Optional<TreeBidiMap<Zone, String>>>() {
  56. @Override
  57. public Optional<TreeBidiMap<Zone, String>> call(List<Operation> newOps,
  58. Optional<TreeBidiMap<Zone, String>> prevTree) throws Exception {
  59. TreeBidiMap<Zone, String> updatedTree = prevTree.orNull();
  60. if (updatedTree == null) {
  61. updatedTree = new TreeBidiMap();
  62. }
  63.  
  64. updateZoneSet(newOps, updatedTree);
  65. return Optional.of(updatedTree);
  66.  
  67. }
  68. };
  69.  
  70. public static void updateZoneSet(List<Operation> newOps,
  71. TreeBidiMap<Zone, String> updatedTree) {
  72.  
  73. HashMap<String, Zone> updatedZones = new HashMap<>();
  74.  
  75. // add the operations into their respective zones
  76. for (Operation op : newOps) {
  77. String value = op.getValue();
  78. Zone z = updatedZones.get(value);
  79. if (z == null) {
  80. z = updatedTree.removeValue(value);
  81. if (z == null) {
  82. z = new Zone(op.getKey(), value);
  83. }
  84. updatedZones.put(value, z);
  85. }
  86. z.addOperation(op);
  87. }
  88.  
  89. //................Removing zones based on recency........................
  90. //While adding we check for the recency of the zone and
  91. //add if they are recent based on a threshold
  92. for (Zone z : updatedZones.values()) {
  93. if (z != null && (Math.abs(z.getStart() - z.getFinish()) < RECENCY)) {
  94. updatedTree.put(z, z.getValue());
  95. }
  96. }
  97.  
  98. //................Removing based on size................................
  99. // APPROACHES-
  100. //1) If (size is greater than a particular value),
  101. // iterate over the map removing the least recent zones until size is allowable
  102. //2) We modify the CompareTo function for zones such that it is based on recency.
  103. // That will ensure that our TreeBidiMap is sorted based on recency
  104. // whenever we go over the threshold size, we delete the required number of least recent zones
  105.  
  106. /*Implementing approach 1.
  107. create an array of the number of zones that need to be removed.
  108. To find the zones that are least recent, iterate over the map and keep track of least recent zones
  109. This will give you an array of least recent zones which you can then remove from the map
  110. */
  111. int zoneTreeSize = updatedTree.size();
  112.  
  113. if (zoneTreeSize > MAXSIZE){
  114. Zone[] leastRecentZones = new Zone[zoneTreeSize-MAXSIZE];
  115. int insertCounter = 0;
  116. for (Zone z:updatedTree.keySet()){
  117.  
  118. //Add first n (zoneTreeSize-MAXSIZE) zones in the array - Initializing the leastRecentZones array
  119. if(insertCounter < zoneTreeSize-MAXSIZE){
  120. leastRecentZones[insertCounter] = z;
  121. insertCounter++;
  122. }
  123.  
  124. //iterate and find if current zone is one of least recent zones
  125. for (int j=0; j < zoneTreeSize-MAXSIZE; j++)
  126. {
  127. if (Math.abs(z.getFinish() - z.getStart()) >
  128. Math.abs(leastRecentZones[i].getFinish() - leastRecentZones[i].getStart())){
  129. leastRecentZones[i] = z;
  130. break;
  131. }
  132. }
  133. }
  134.  
  135. //Remove least recent zones from the map.
  136. for (int j=0; j<zoneTreeSize-MAXSIZE;j++)
  137. updatedTree.remove(leastRecentZones[j];
  138. }
  139.  
  140. Long maxRight = 0L;
  141. for (Zone z : updatedTree.keySet()) {
  142. if (z != null) {
  143. z.setMaxRightEarlierZones(maxRight);
  144. maxRight = Math.max(maxRight, z.getRight());
  145. }
  146. }
  147. }
  148.  
  149. // takes the scores for each zone and groups them by their keys
  150. public static PairFunction<Tuple2<Zone, Long[]>, String, Long> groupScoresByKey = new PairFunction<Tuple2<Zone, Long[]>, String, Long>() {
  151. @Override
  152. public Tuple2<String, Long> call(Tuple2<Zone, Long[]> scoreTuple)
  153. throws Exception {
  154. return new Tuple2<String, Long>(scoreTuple._1().getKey(),
  155. scoreTuple._2()[1]);
  156. }
  157. };
  158.  
  159. // reduce function that returns max score
  160. public static Function2<Long, Long, Long> getMaxScore = new Function2<Long, Long, Long>() {
  161. @Override
  162. public Long call(Long score1, Long score2) throws Exception {
  163. return Math.max(score1, score2);
  164. }
  165. };
  166.  
  167. // iterates through the set of zones and computes gamma score for each zone
  168. public static PairFlatMapFunction<Tuple2<String, TreeBidiMap<Zone, String>>, Zone, Long[]> calculateGammaPerZone = new PairFlatMapFunction<Tuple2<String, TreeBidiMap<Zone, String>>, Zone, Long[]>() {
  169. @Override
  170. public Iterable<Tuple2<Zone, Long[]>> call(
  171. Tuple2<String, TreeBidiMap<Zone, String>> zoneTree)
  172. throws Exception {
  173.  
  174. List<Tuple2<Zone, Long[]>> scoreList = new ArrayList<Tuple2<Zone, Long[]>>();
  175. TreeSet<Zone> zones = new TreeSet(zoneTree._2().keySet());
  176. for (Zone zone : zones) {
  177. Long[] score = new Long[2];
  178.  
  179. score = ZoneAnalysis.compareWithZones(zone, zones, false, 0L);
  180.  
  181. if (score.length > 0) {
  182. scoreList.add(new Tuple2<Zone, Long[]>(zone, score));
  183. }
  184.  
  185. }
  186.  
  187. return scoreList;
  188. }
  189. };
  190.  
  191. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement