Guest User

Untitled

a guest
Dec 13th, 2017
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.76 KB | None | 0 0
  1. public static void aggregateActions(String[] args) {
  2. // Register the Options to be passed from command line argument
  3. PipelineOptionsFactory.register(Options.class);
  4. // Create the options from Args, execute the validation
  5. Options options = PipelineOptionsFactory.fromArgs(args)
  6. .withValidation()
  7. .as(Options.class);
  8.  
  9. // Setup values for setting an Streaming Job, with a FLink Runner
  10. options.setStreaming(true);
  11. //at tme moment runs on DirectRunner
  12. options.setRunner(FlinkRunner.class);
  13.  
  14. // Create the pipeline with the options
  15. Pipeline pipeline = Pipeline.create(options);
  16.  
  17.  
  18. PTransform<PBegin, PCollection<KV<String, query_request>>> kafkaRead = KafkaIO.<String, query_request>read()
  19. .withBootstrapServers(options.getBrokerPort())
  20. .withTopic(options.getKafkaTopic())
  21.  
  22. .withKeyDeserializer(StringDeserializer.class)
  23. .withValueDeserializerAndCoder((Class)ByteArrayDeserializer.class, AvroCoder.of(query_request.class))
  24. .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
  25. .withMaxNumRecords(1)
  26. .withoutMetadata();
  27.  
  28. pipeline.apply(kafkaRead)
  29. .apply(Values.<query_request>create())
  30. .apply("PrintMessages", ParDo.of(new DoFn<query_request, String>() {
  31.  
  32. @ProcessElement
  33. public void processElement(ProcessContext c) {
  34.  
  35. query_request a = c.element();
  36.  
  37. System.out.println(" [Action]: " + a.getAction().toString());
  38. }
  39. }));
  40.  
  41. pipeline.run().waitUntilFinish();
  42.  
  43. }
Add Comment
Please, Sign In to add comment