Advertisement
Guest User

Untitled

a guest
May 24th, 2017
65
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.05 KB | None | 0 0
  1. package pl.edu.agh.suu.spark.samples;
  2.  
  3. import org.apache.commons.lang.StringUtils;
  4. import org.apache.spark.SparkConf;
  5. import org.apache.spark.api.java.JavaPairRDD;
  6. import org.apache.spark.api.java.JavaRDD;
  7. import org.apache.spark.api.java.JavaSparkContext;
  8. import org.apache.spark.api.java.function.FlatMapFunction;
  9. import org.apache.spark.api.java.function.Function;
  10. import org.apache.spark.api.java.function.Function2;
  11. import org.apache.spark.api.java.function.PairFunction;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import scala.Serializable;
  15. import scala.Tuple2;
  16. import scala.reflect.internal.Trees;
  17.  
  18. import java.util.Arrays;
  19. import java.util.Iterator;
  20. import java.util.List;
  21.  
  22.  
  23. public class SampleCVS implements Serializable {
  24.  
  25. private static final Logger LOGGER = LoggerFactory.getLogger(SampleCVS.class);
  26.  
  27. public static void main(String[] args) {
  28. new SampleCVS().run();
  29. }
  30.  
  31. /**
  32. * The task body
  33. */
  34. public void run() {
  35. String master = "local[*]";
  36.  
  37. /*
  38. * Initialises a Spark context.
  39. */
  40. SparkConf conf = new SparkConf()
  41. .setAppName(SampleCVS.class.getName())
  42. .setMaster(master);
  43. JavaSparkContext context = new JavaSparkContext(conf);
  44.  
  45. /*
  46. * Performs a work count sequence of tasks and prints the output with a logger.
  47. */
  48. String input = " Department, Designation, costToCompany, State\n" +
  49. " Sales, Trainee, 12000, UP\n" +
  50. " Sales, Lead, 32000, AP\n" +
  51. " Sales, Lead, 32000, LA\n" +
  52. " Sales, Lead, 32000, TN\n" +
  53. " Sales, Lead, 32000, AP\n" +
  54. " Sales, Lead, 32000, TN \n" +
  55. " Sales, Lead, 32000, LA\n" +
  56. " Sales, Lead, 32000, LA\n" +
  57. " Marketing, Associate, 18000, TN\n" +
  58. " Marketing, Associate, 18000, TN\n" +
  59. " HR, Manager, 58000, TN";
  60.  
  61. JavaRDD<String> inputFile = context.textFile("billing1.txt");
  62.  
  63.  
  64. JavaRDD<String> words = inputFile.flatMap(
  65. (FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\n")).iterator());
  66.  
  67. JavaRDD<NewRecord> rdd_records = words.map(
  68. (Function<String, NewRecord>) line -> {
  69. String[] fields = line.split(";");
  70. String number = fields[2];
  71. Double cost = Double.parseDouble(fields[8]);
  72. Boolean isRoaming = Boolean.parseBoolean(fields[6]);
  73. LOGGER.info(number + cost);
  74. return new NewRecord(number, cost, isRoaming);
  75. });
  76.  
  77. // LOGGER.info(rdd_records.top(5).toString());
  78.  
  79. Function<NewRecord, Boolean> filtered =
  80. (Function<NewRecord, Boolean>) a -> {
  81. return (a.getRoaming() && !a.getCost().isInfinite() && !a.getCost().isNaN());
  82. };
  83.  
  84. JavaRDD<NewRecord> filtered_rdd_records = rdd_records.filter(filtered);
  85.  
  86. // compute averge for entire set
  87. Function2<AvgCount, NewRecord, AvgCount> addAndCount =
  88. (Function2<AvgCount, NewRecord, AvgCount>) (a, x) -> {
  89. a.total += x.getCost();
  90. a.num += 1;
  91. return a;
  92. };
  93. Function2<AvgCount, AvgCount, AvgCount> combine =
  94. (Function2<AvgCount, AvgCount, AvgCount>) (a, b) -> {
  95. a.total += b.total;
  96. a.num += b.num;
  97. return a;
  98. };
  99.  
  100.  
  101. AvgCount initial = new AvgCount(0, 0);
  102. AvgCount result = filtered_rdd_records.aggregate(initial, addAndCount, combine);
  103. LOGGER.info(Double.toString(result.avg()));
  104.  
  105.  
  106. // compute average per department
  107. initial = new AvgCount(0, 0);
  108. JavaPairRDD<String, Double> rdd_pair_records = rdd_records.mapToPair(
  109. (PairFunction<NewRecord, String, Double>)
  110. x -> new Tuple2<>(x.getNumber(), x.getCost())
  111. );
  112.  
  113. Function<Double, AvgCount> createAcc =
  114. (Function<Double, AvgCount>) x -> new AvgCount(x, 1);
  115.  
  116. Function2<AvgCount, Double, AvgCount> addAndCount2 =
  117. (Function2<AvgCount, Double, AvgCount>) (a, x) -> {
  118. a.total += x;
  119. a.num += 1;
  120. return a;
  121. };
  122.  
  123.  
  124. JavaPairRDD<String, AvgCount> avgCounts =
  125. rdd_pair_records.combineByKey(createAcc, addAndCount2, combine);
  126.  
  127. avgCounts.foreach(result1 -> LOGGER.info(
  128. String.format(" Number [%s] average [%f].", result1._1(), result1._2.avg())));
  129.  
  130.  
  131.  
  132.  
  133.  
  134. }
  135. }
  136.  
  137.  
  138. // aggregate helper class
  139. class AvgCount implements Serializable {
  140. public double total;
  141. public int num;
  142. public AvgCount(double total, int num) {
  143. this.total = total;
  144. this.num = num;
  145. }
  146.  
  147. public double avg() {
  148. return total / (double) num;
  149. }
  150. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement