Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package pl.edu.agh.suu.spark.samples;
- import org.apache.commons.lang.StringUtils;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import scala.Serializable;
- import scala.Tuple2;
- import scala.reflect.internal.Trees;
- import java.util.Arrays;
- import java.util.Iterator;
- import java.util.List;
- public class SampleCVS implements Serializable {
- private static final Logger LOGGER = LoggerFactory.getLogger(SampleCVS.class);
- public static void main(String[] args) {
- new SampleCVS().run();
- }
- /**
- * The task body
- */
- public void run() {
- String master = "local[*]";
- /*
- * Initialises a Spark context.
- */
- SparkConf conf = new SparkConf()
- .setAppName(SampleCVS.class.getName())
- .setMaster(master);
- JavaSparkContext context = new JavaSparkContext(conf);
- /*
- * Performs a work count sequence of tasks and prints the output with a logger.
- */
- String input = " Department, Designation, costToCompany, State\n" +
- " Sales, Trainee, 12000, UP\n" +
- " Sales, Lead, 32000, AP\n" +
- " Sales, Lead, 32000, LA\n" +
- " Sales, Lead, 32000, TN\n" +
- " Sales, Lead, 32000, AP\n" +
- " Sales, Lead, 32000, TN \n" +
- " Sales, Lead, 32000, LA\n" +
- " Sales, Lead, 32000, LA\n" +
- " Marketing, Associate, 18000, TN\n" +
- " Marketing, Associate, 18000, TN\n" +
- " HR, Manager, 58000, TN";
- JavaRDD<String> inputFile = context.textFile("billing1.txt");
- JavaRDD<String> words = inputFile.flatMap(
- (FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\n")).iterator());
- JavaRDD<NewRecord> rdd_records = words.map(
- (Function<String, NewRecord>) line -> {
- String[] fields = line.split(";");
- String number = fields[2];
- Double cost = Double.parseDouble(fields[8]);
- Boolean isRoaming = Boolean.parseBoolean(fields[6]);
- LOGGER.info(number + cost);
- return new NewRecord(number, cost, isRoaming);
- });
- // LOGGER.info(rdd_records.top(5).toString());
- Function<NewRecord, Boolean> filtered =
- (Function<NewRecord, Boolean>) a -> {
- return (a.getRoaming() && !a.getCost().isInfinite() && !a.getCost().isNaN());
- };
- JavaRDD<NewRecord> filtered_rdd_records = rdd_records.filter(filtered);
- // compute averge for entire set
- Function2<AvgCount, NewRecord, AvgCount> addAndCount =
- (Function2<AvgCount, NewRecord, AvgCount>) (a, x) -> {
- a.total += x.getCost();
- a.num += 1;
- return a;
- };
- Function2<AvgCount, AvgCount, AvgCount> combine =
- (Function2<AvgCount, AvgCount, AvgCount>) (a, b) -> {
- a.total += b.total;
- a.num += b.num;
- return a;
- };
- AvgCount initial = new AvgCount(0, 0);
- AvgCount result = filtered_rdd_records.aggregate(initial, addAndCount, combine);
- LOGGER.info(Double.toString(result.avg()));
- // compute average per department
- initial = new AvgCount(0, 0);
- JavaPairRDD<String, Double> rdd_pair_records = rdd_records.mapToPair(
- (PairFunction<NewRecord, String, Double>)
- x -> new Tuple2<>(x.getNumber(), x.getCost())
- );
- Function<Double, AvgCount> createAcc =
- (Function<Double, AvgCount>) x -> new AvgCount(x, 1);
- Function2<AvgCount, Double, AvgCount> addAndCount2 =
- (Function2<AvgCount, Double, AvgCount>) (a, x) -> {
- a.total += x;
- a.num += 1;
- return a;
- };
- JavaPairRDD<String, AvgCount> avgCounts =
- rdd_pair_records.combineByKey(createAcc, addAndCount2, combine);
- avgCounts.foreach(result1 -> LOGGER.info(
- String.format(" Number [%s] average [%f].", result1._1(), result1._2.avg())));
- }
- }
- // aggregate helper class
- class AvgCount implements Serializable {
- public double total;
- public int num;
- public AvgCount(double total, int num) {
- this.total = total;
- this.num = num;
- }
- public double avg() {
- return total / (double) num;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement