daily pastebin goal
1%
SHARE
TWEET

Untitled

a guest Feb 19th, 2019 62 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package com.dataflow.samples;
  2.  
  3. import java.time.Instant;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. import org.apache.beam.sdk.Pipeline;
  7. import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
  8. import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
  9. import org.apache.beam.sdk.io.TextIO;
  10. import org.apache.beam.sdk.options.Default;
  11. import org.apache.beam.sdk.options.Description;
  12. import org.apache.beam.sdk.options.PipelineOptions;
  13. import org.apache.beam.sdk.options.PipelineOptionsFactory;
  14. import org.apache.beam.sdk.transforms.DoFn;
  15. import org.apache.beam.sdk.transforms.ParDo;
  16. import com.google.api.services.bigquery.model.TableFieldSchema;
  17. import com.google.api.services.bigquery.model.TableRow;
  18. import com.google.api.services.bigquery.model.TableSchema;
  19.  
  20.  
  21. public class NonStandardDelimiters {
  22.  
  23.     public static interface MyOptions extends PipelineOptions {
  24.         @Description("Output BigQuery table <PROJECT_ID>:<DATASET>.<TABLE>")
  25.         String getOutput();
  26.         void setOutput(String s);
  27.  
  28.         @Description("Input file, gs://path/to/file")
  29.         String getInput();     
  30.         void setInput(String s);
  31.     }
  32.  
  33.     @SuppressWarnings("serial")
  34.     public static void main(String[] args) {
  35.         MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
  36.  
  37.         Pipeline p = Pipeline.create(options);
  38.  
  39.         String file = options.getInput();
  40.         String output = options.getOutput();
  41.  
  42.         // Build the table schema for the output table.
  43.         List<TableFieldSchema> fields = new ArrayList<>();
  44.         fields.add(new TableFieldSchema().setName("data").setType("STRING"));
  45.         TableSchema schema = new TableSchema().setFields(fields);
  46.  
  47.         p
  48.             .apply("GetMessages", TextIO.read().from(file))
  49.             .apply("ExtractRows", ParDo.of(new DoFn<String, String>() {
  50.                 @ProcessElement
  51.                 public void processElement(ProcessContext c) {
  52.                   for (String line : c.element().split("\\\\x01\\\\n")) {
  53.                     if (!line.isEmpty()) {
  54.                       c.output(line);
  55.                     }
  56.                   }
  57.                 }
  58.             }))
  59.             .apply("ToBQRow", ParDo.of(new DoFn<String, TableRow>() {
  60.                 @ProcessElement
  61.                 public void processElement(ProcessContext c) throws Exception {
  62.                     TableRow row = new TableRow();
  63.                     row.set("data", c.element());
  64.                     c.output(row);
  65.                 }
  66.             }))
  67.             .apply(BigQueryIO.writeTableRows().to(output)
  68.                     .withSchema(schema)
  69.                     .withMethod(Method.FILE_LOADS)
  70.                     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
  71.                     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
  72.  
  73.         p.run();
  74.     }
  75. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top