Guest User

Untitled

a guest
Nov 17th, 2017
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.18 KB | None | 0 0
  1. package com.mux.blog.flinkpercentiles;
  2.  
  3. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
  7. import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
  8.  
  9. import java.util.Properties;
  10.  
  11. public class AppleAssignmentMetricsApp {
  12.  
  13. public static void main(String[] args) throws Exception {
  14. // create Kafka connection properties for stream source
  15. Properties kafkaConnectionProperties = new Properties();
  16. kafkaConnectionProperties.setProperty("bootstrap.servers", "kafka:9092");
  17. kafkaConnectionProperties.setProperty("group.id", "apple-assignment-flink-app");
  18.  
  19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20. SingleOutputStreamOperator appleAssignmentSource = env
  21. .addSource(new FlinkKafkaConsumer010("topic", new SimpleStringSchema(), kafkaConnectionProperties))
  22. .name("Kafka Source")
  23. .flatMap((o, collector) -> collector.collect(Integer.parseInt((String) o)))
  24. .name("Parse");
  25.  
  26. calculatePercentilesOnWindow(appleAssignmentSource, 10);
  27. calculatePercentilesOnWindow(appleAssignmentSource, 50);
  28. calculatePercentilesOnWindow(appleAssignmentSource, 100);
  29.  
  30. System.out.println(env.getExecutionPlan());
  31. env.execute();
  32. }
  33.  
  34. private static void calculatePercentilesOnWindow(SingleOutputStreamOperator appleAssignmentSource, int windowSize) {
  35. appleAssignmentSource.countWindowAll(windowSize)
  36. .aggregate(new AppleAssignmentAggregateFunction())
  37. .name("Calculate Percentiles on apple window: " + windowSize)
  38. .addSink(new RichSinkFunction<AppleAssignmentStats>() {
  39. @Override
  40. public void invoke(AppleAssignmentStats stats) throws Exception {
  41. System.out.println(stats.toString());
  42. }
  43. })
  44. .name("Send results to Sink");
  45. }
  46. }
Add Comment
Please, Sign In to add comment