Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.mux.blog.flinkpercentiles;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
- import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
- import java.util.Properties;
- public class AppleAssignmentMetricsApp {
- public static void main(String[] args) throws Exception {
- // create Kafka connection properties for stream source
- Properties kafkaConnectionProperties = new Properties();
- kafkaConnectionProperties.setProperty("bootstrap.servers", "kafka:9092");
- kafkaConnectionProperties.setProperty("group.id", "apple-assignment-flink-app");
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- SingleOutputStreamOperator appleAssignmentSource = env
- .addSource(new FlinkKafkaConsumer010("topic", new SimpleStringSchema(), kafkaConnectionProperties))
- .name("Kafka Source")
- .flatMap((o, collector) -> collector.collect(Integer.parseInt((String) o)))
- .name("Parse");
- calculatePercentilesOnWindow(appleAssignmentSource, 10);
- calculatePercentilesOnWindow(appleAssignmentSource, 50);
- calculatePercentilesOnWindow(appleAssignmentSource, 100);
- System.out.println(env.getExecutionPlan());
- env.execute();
- }
- private static void calculatePercentilesOnWindow(SingleOutputStreamOperator appleAssignmentSource, int windowSize) {
- appleAssignmentSource.countWindowAll(windowSize)
- .aggregate(new AppleAssignmentAggregateFunction())
- .name("Calculate Percentiles on apple window: " + windowSize)
- .addSink(new RichSinkFunction<AppleAssignmentStats>() {
- @Override
- public void invoke(AppleAssignmentStats stats) throws Exception {
- System.out.println(stats.toString());
- }
- })
- .name("Send results to Sink");
- }
- }
Add Comment
Please, Sign In to add comment