Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public static void aggregateActions(String[] args) {
- // Register the Options to be passed from command line argument
- PipelineOptionsFactory.register(Options.class);
- // Create the options from Args, execute the validation
- Options options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(Options.class);
- // Setup values for setting an Streaming Job, with a FLink Runner
- options.setStreaming(true);
- //at tme moment runs on DirectRunner
- options.setRunner(FlinkRunner.class);
- // Create the pipeline with the options
- Pipeline pipeline = Pipeline.create(options);
- PTransform<PBegin, PCollection<KV<String, query_request>>> kafkaRead = KafkaIO.<String, query_request>read()
- .withBootstrapServers(options.getBrokerPort())
- .withTopic(options.getKafkaTopic())
- .withKeyDeserializer(StringDeserializer.class)
- .withValueDeserializerAndCoder((Class)ByteArrayDeserializer.class, AvroCoder.of(query_request.class))
- .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
- .withMaxNumRecords(1)
- .withoutMetadata();
- pipeline.apply(kafkaRead)
- .apply(Values.<query_request>create())
- .apply("PrintMessages", ParDo.of(new DoFn<query_request, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- query_request a = c.element();
- System.out.println(" [Action]: " + a.getAction().toString());
- }
- }));
- pipeline.run().waitUntilFinish();
- }
Add Comment
Please, Sign In to add comment