Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.apache.beam.examples;
- import org.apache.beam.sdk.Pipeline;
- import org.apache.beam.sdk.io.TextIO;
- import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
- import org.apache.beam.sdk.options.PipelineOptions;
- import org.apache.beam.sdk.options.PipelineOptionsFactory;
- import org.apache.beam.sdk.transforms.*;
- import org.apache.beam.sdk.transforms.windowing.FixedWindows;
- import org.apache.beam.sdk.transforms.windowing.Window;
- import org.apache.beam.sdk.values.KV;
- import org.apache.beam.sdk.values.PCollection;
- import org.apache.beam.sdk.values.TypeDescriptors;
- import org.joda.time.Duration;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.Arrays;
- /*
- export PIPELINE_NAME=test_withTimeStampAttribute
- export GOOGLE_APPLICATION_CREDENTIALS=/home/<USER>/current.json
- export YOUR_GCS_BUCKET=<MY_BUCKET>
- export YOUR_PROJECT=<MY_PROJ>
- export YOUR_NETWORK=<MY_NETWORK>
- export YOUR_SUBNETWORK=regions/europe-west2/subnetworks/<MY_SUBNET>
- export RUNNER=DataflowRunner
- export REGION=europe-west1
- export ZONE=europe-west2-c
- export RUNNER=DataflowRunner
- # create new job
- mvn compile exec:java \
- -Dexec.mainClass=org.apache.beam.examples.${PIPELINE_NAME} \
- -Dexec.args="--runner=${RUNNER} \
- --project=${YOUR_PROJECT} \
- --network=${YOUR_NETWORK} \
- --subnetwork=${YOUR_SUBNETWORK} \
- --region=${REGION} \
- --zone=${ZONE} \
- --stagingLocation=gs://${YOUR_GCS_BUCKET}/${PIPELINE_NAME}/staging/ \
- --tempLocation=gs://${YOUR_GCS_BUCKET}/${PIPELINE_NAME}/temp/" \
- -Pdataflow-runner
- # generate test data in pubsub
- export TOPIC=<MY_PUBSUB_TOPIC>
- gcloud pubsub topics publish ${TOPIC} --message '{"test": "string","attributes": {"message_timestamp": "1529516762551"}}' \
- --attribute message_timestamp=1529516762551,message_id=$(uuidgen)
- */
- public class test_withTimeStampAttribute {
- static Logger LOG = LoggerFactory.getLogger(test_withTimeStampAttribute.class);
- public static void main(String[] args) {
- PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(PipelineOptions.class);
- Pipeline pipeline = Pipeline.create(options);
- pipeline
- .apply("PubsubIO",PubsubIO.readStrings()
- .withTimestampAttribute("message_timestamp")
- .fromSubscription("projects/lcg-bi-nonprod/subscriptions/myOutputSub"))
- .apply(Window.<String>into(FixedWindows.of(Duration.millis(500L))))
- .apply(TextIO.write().to("gs://pythian-test-bucket-ladbrokes")
- .withWindowedWrites()
- .withNumShards(1)
- );
- pipeline.run().waitUntilFinish();
- }
- }
Add Comment
Please, Sign In to add comment