Guest User

Untitled

a guest
Jun 23rd, 2020
33
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.45 KB | None | 0 0
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.beam.examples;
  19.  
  20. import org.apache.beam.sdk.options.Description;
  21.  
  22. import org.apache.beam.sdk.Pipeline;
  23. import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
  24. import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
  25. import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
  26. import org.apache.beam.sdk.options.PipelineOptions;
  27. import org.apache.beam.sdk.options.PipelineOptionsFactory;
  28. import org.apache.beam.sdk.transforms.Combine;
  29. import org.apache.beam.sdk.transforms.Count;
  30. import org.apache.beam.sdk.transforms.MapElements;
  31. import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
  32. import org.apache.beam.sdk.transforms.windowing.FixedWindows;
  33. import org.apache.beam.sdk.transforms.windowing.Repeatedly;
  34. import org.apache.beam.sdk.transforms.windowing.Window;
  35. import org.apache.beam.sdk.values.PCollection;
  36. import org.apache.beam.sdk.values.TypeDescriptors;
  37. import org.joda.time.Duration;
  38. import org.apache.beam.sdk.options.Default;
  39. import org.apache.beam.sdk.options.Validation.Required;
  40. import java.util.Date;
  41.  
  42. public class Pubsub {
  43.  
  44. public interface TestOptions extends PipelineOptions {
  45.  
  46. /**
  47. * By default, this example reads from a public dataset containing the text of King Lear. Set
  48. * this option to choose a different input file or glob.
  49. */
  50. @Description("Path of the file to read from")
  51. @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
  52. String getInputFile();
  53.  
  54. void setInputFile(String value);
  55.  
  56. /** Set this required option to specify where to write the output. */
  57. @Description("Path of the file to write to")
  58. @Required
  59. String getOutput();
  60.  
  61. void setOutput(String value);
  62. }
  63.  
  64. /**
  65. * Send a Number to the PubSub topic one at a time... without quick updates. 2 mssages will fire ,
  66. * one wont.
  67. */
  68. public static void main(String[] args) {
  69.  
  70. //GcpOptions options = PipelineOptionsFactory.create().as(PubsubOptions.class);
  71. TestOptions options =
  72. PipelineOptionsFactory.fromArgs(args).withValidation().as(TestOptions.class);
  73. //options.setProject("amordkov-user-project-test");
  74.  
  75. Pipeline p = Pipeline.create(options);
  76.  
  77. // Read
  78. PCollection<String> blah =
  79. p.apply(
  80. PubsubIO.readStrings().fromSubscription("projects/amordkov-user-project-test/subscriptions/slow-input"));
  81.  
  82.  
  83. // Output what we read
  84. blah.apply(print("Received incoming message at "));
  85.  
  86. // Do an actual aggregation
  87. blah.apply(MapElements.into(TypeDescriptors.longs()).via(Long::valueOf))
  88. .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
  89. .apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults())
  90. .apply(
  91. print(
  92. "Watermark trigger fired at "));
  93.  
  94. // Do an actual aggregation without watermark trigger but process trigger
  95. blah.apply(MapElements.into(TypeDescriptors.longs()).via(Long::valueOf))
  96. .apply(
  97. Window.<Long>into(FixedWindows.of(Duration.standardSeconds(10)))
  98. .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
  99. .withAllowedLateness(Duration.ZERO)
  100. .discardingFiredPanes())
  101. .apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults())
  102. .apply(print("Processing time trigger fired at "));
  103.  
  104. p.run();
  105. }
  106.  
  107. public static <T> MapElements<T, String> print(String message) {
  108. return MapElements.into(TypeDescriptors.strings())
  109. .<T>via(
  110. x -> {
  111. System.out.println(message + (new Date()) + ", value = " + x);
  112. return "";
  113. });
  114. }
  115. }
Add Comment
Please, Sign In to add comment