Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");
- JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(CassandraRow row) {
- return new Tuple2<String, Integer>(row.getString("date"), 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer count1, Integer count2) {
- return count1 + count2;
- }
- });
- save(batchRDD) // Assume this saves the batch RDD somewhere
- ...
- // Assume we read a chunk of logs from the Kafka stream every x seconds.
- JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...);
- JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFunction<Tuple2<String, String>, String, Integer>() {
- @Override
- public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
- String jsonString = data._2;
- JSON jsonObj = JSON.parse(jsonString);
- Date eventDate = ... // get date from json object
- // Assume startTime is broadcast variable that is set to the time when the job started.
- if (eventDate.after(startTime.value())) {
- ArrayList<Tuple2<Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
- pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
- return pairs;
- } else {
- return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
- }
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer count1, Integer count2) {
- return count1 + count2;
- }
- }).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
- Integer previousValue = state.or(0l);
- Integer currentValue = ... // Sum of counts
- return Optional.of(previousValue + currentValue);
- }
- });
- save(streamRDD); // Assume this saves the stream RDD somewhere
- sc.start();
- sc.awaitTermination();
- ("2014-10-15", 1000000)
- ("2014-10-16", 2000000)
- |------------------------|-------------|--------------|--------->
- tBatchStart tStreamStart streamBatch1 streamBatch2
- ("2014-10-19", 1000)
- ("2014-10-19", 2001000)
- ("2014-10-19", 4000)
- ("2014-10-19", 2005000)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement