Advertisement
Guest User

Untitled

a guest
Oct 22nd, 2014
178
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.53 KB | None | 0 0
  1. CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");
  2.  
  3. JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
  4. @Override
  5. public Tuple2<String, Integer> call(CassandraRow row) {
  6. return new Tuple2<String, Integer>(row.getString("date"), 1);
  7. }
  8. }).reduceByKey(new Function2<Integer, Integer, Integer>() {
  9. @Override
  10. public Integer call(Integer count1, Integer count2) {
  11. return count1 + count2;
  12. }
  13. });
  14.  
  15. save(batchRDD) // Assume this saves the batch RDD somewhere
  16.  
  17. ...
  18.  
  19. // Assume we read a chunk of logs from the Kafka stream every x seconds.
  20. JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...);
  21. JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFunction<Tuple2<String, String>, String, Integer>() {
  22. @Override
  23. public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
  24. String jsonString = data._2;
  25. JSON jsonObj = JSON.parse(jsonString);
  26. Date eventDate = ... // get date from json object
  27. // Assume startTime is broadcast variable that is set to the time when the job started.
  28. if (eventDate.after(startTime.value())) {
  29. ArrayList<Tuple2<Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
  30. pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
  31. return pairs;
  32. } else {
  33. return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
  34. }
  35. }
  36. }).reduceByKey(new Function2<Integer, Integer, Integer>() {
  37. @Override
  38. public Integer call(Integer count1, Integer count2) {
  39. return count1 + count2;
  40. }
  41. }).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
  42. @Override
  43. public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
  44. Integer previousValue = state.or(0l);
  45. Integer currentValue = ... // Sum of counts
  46. return Optional.of(previousValue + currentValue);
  47. }
  48. });
  49. save(streamRDD); // Assume this saves the stream RDD somewhere
  50.  
  51. sc.start();
  52. sc.awaitTermination();
  53.  
  54. ("2014-10-15", 1000000)
  55. ("2014-10-16", 2000000)
  56.  
  57. |------------------------|-------------|--------------|--------->
  58. tBatchStart tStreamStart streamBatch1 streamBatch2
  59.  
  60. ("2014-10-19", 1000)
  61.  
  62. ("2014-10-19", 2001000)
  63.  
  64. ("2014-10-19", 4000)
  65.  
  66. ("2014-10-19", 2005000)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement