Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.beam.examples;
- import org.apache.beam.sdk.options.Description;
- import org.apache.beam.sdk.Pipeline;
- import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
- import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
- import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
- import org.apache.beam.sdk.options.PipelineOptions;
- import org.apache.beam.sdk.options.PipelineOptionsFactory;
- import org.apache.beam.sdk.transforms.Combine;
- import org.apache.beam.sdk.transforms.Count;
- import org.apache.beam.sdk.transforms.MapElements;
- import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
- import org.apache.beam.sdk.transforms.windowing.FixedWindows;
- import org.apache.beam.sdk.transforms.windowing.Repeatedly;
- import org.apache.beam.sdk.transforms.windowing.Window;
- import org.apache.beam.sdk.values.PCollection;
- import org.apache.beam.sdk.values.TypeDescriptors;
- import org.joda.time.Duration;
- import org.apache.beam.sdk.options.Default;
- import org.apache.beam.sdk.options.Validation.Required;
- import java.util.Date;
- public class Pubsub {
- public interface TestOptions extends PipelineOptions {
- /**
- * By default, this example reads from a public dataset containing the text of King Lear. Set
- * this option to choose a different input file or glob.
- */
- @Description("Path of the file to read from")
- @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
- String getInputFile();
- void setInputFile(String value);
- /** Set this required option to specify where to write the output. */
- @Description("Path of the file to write to")
- @Required
- String getOutput();
- void setOutput(String value);
- }
- /**
- * Send a Number to the PubSub topic one at a time... without quick updates. 2 mssages will fire ,
- * one wont.
- */
- public static void main(String[] args) {
- //GcpOptions options = PipelineOptionsFactory.create().as(PubsubOptions.class);
- TestOptions options =
- PipelineOptionsFactory.fromArgs(args).withValidation().as(TestOptions.class);
- //options.setProject("amordkov-user-project-test");
- Pipeline p = Pipeline.create(options);
- // Read
- PCollection<String> blah =
- p.apply(
- PubsubIO.readStrings().fromSubscription("projects/amordkov-user-project-test/subscriptions/slow-input"));
- // Output what we read
- blah.apply(print("Received incoming message at "));
- // Do an actual aggregation
- blah.apply(MapElements.into(TypeDescriptors.longs()).via(Long::valueOf))
- .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
- .apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults())
- .apply(
- print(
- "Watermark trigger fired at "));
- // Do an actual aggregation without watermark trigger but process trigger
- blah.apply(MapElements.into(TypeDescriptors.longs()).via(Long::valueOf))
- .apply(
- Window.<Long>into(FixedWindows.of(Duration.standardSeconds(10)))
- .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
- .withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes())
- .apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults())
- .apply(print("Processing time trigger fired at "));
- p.run();
- }
- public static <T> MapElements<T, String> print(String message) {
- return MapElements.into(TypeDescriptors.strings())
- .<T>via(
- x -> {
- System.out.println(message + (new Date()) + ", value = " + x);
- return "";
- });
- }
- }
Add Comment
Please, Sign In to add comment