Guest User

Untitled

a guest
Jun 22nd, 2018
69
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.64 KB | None | 0 0
  1. package org.apache.beam.examples;
  2.  
  3. import org.apache.beam.sdk.Pipeline;
  4. import org.apache.beam.sdk.io.TextIO;
  5. import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
  6. import org.apache.beam.sdk.options.PipelineOptions;
  7. import org.apache.beam.sdk.options.PipelineOptionsFactory;
  8. import org.apache.beam.sdk.transforms.*;
  9. import org.apache.beam.sdk.transforms.windowing.FixedWindows;
  10. import org.apache.beam.sdk.transforms.windowing.Window;
  11. import org.apache.beam.sdk.values.KV;
  12. import org.apache.beam.sdk.values.PCollection;
  13. import org.apache.beam.sdk.values.TypeDescriptors;
  14. import org.joda.time.Duration;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17.  
  18. import java.util.Arrays;
  19.  
  20. /*
  21. export PIPELINE_NAME=test_withTimeStampAttribute
  22. export GOOGLE_APPLICATION_CREDENTIALS=/home/<USER>/current.json
  23. export YOUR_GCS_BUCKET=<MY_BUCKET>
  24. export YOUR_PROJECT=<MY_PROJ>
  25. export YOUR_NETWORK=<MY_NETWORK>
  26. export YOUR_SUBNETWORK=regions/europe-west2/subnetworks/<MY_SUBNET>
  27. export RUNNER=DataflowRunner
  28. export REGION=europe-west1
  29. export ZONE=europe-west2-c
  30. export RUNNER=DataflowRunner
  31.  
  32. # create new job
  33. mvn compile exec:java \
  34. -Dexec.mainClass=org.apache.beam.examples.${PIPELINE_NAME} \
  35. -Dexec.args="--runner=${RUNNER} \
  36. --project=${YOUR_PROJECT} \
  37. --network=${YOUR_NETWORK} \
  38. --subnetwork=${YOUR_SUBNETWORK} \
  39. --region=${REGION} \
  40. --zone=${ZONE} \
  41. --stagingLocation=gs://${YOUR_GCS_BUCKET}/${PIPELINE_NAME}/staging/ \
  42. --tempLocation=gs://${YOUR_GCS_BUCKET}/${PIPELINE_NAME}/temp/" \
  43. -Pdataflow-runner
  44.  
  45. # generate test data in pubsub
  46. export TOPIC=<MY_PUBSUB_TOPIC>
  47. gcloud pubsub topics publish ${TOPIC} --message '{"test": "string","attributes": {"message_timestamp": "1529516762551"}}' \
  48. --attribute message_timestamp=1529516762551,message_id=$(uuidgen)
  49. */
  50. public class test_withTimeStampAttribute {
  51.  
  52. static Logger LOG = LoggerFactory.getLogger(test_withTimeStampAttribute.class);
  53.  
  54. public static void main(String[] args) {
  55.  
  56. PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
  57. .withValidation()
  58. .as(PipelineOptions.class);
  59.  
  60. Pipeline pipeline = Pipeline.create(options);
  61.  
  62. pipeline
  63. .apply("PubsubIO",PubsubIO.readStrings()
  64. .withTimestampAttribute("message_timestamp")
  65. .fromSubscription("projects/lcg-bi-nonprod/subscriptions/myOutputSub"))
  66. .apply(Window.<String>into(FixedWindows.of(Duration.millis(500L))))
  67. .apply(TextIO.write().to("gs://pythian-test-bucket-ladbrokes")
  68. .withWindowedWrites()
  69. .withNumShards(1)
  70. );
  71.  
  72. pipeline.run().waitUntilFinish();
  73. }
  74.  
  75. }
Add Comment
Please, Sign In to add comment