Advertisement
Guest User

Untitled

a guest
Feb 19th, 2019
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.51 KB | None | 0 0
  1. package com.dataflow.samples;
  2.  
  3. import java.time.Instant;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. import org.apache.beam.sdk.Pipeline;
  7. import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
  8. import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
  9. import org.apache.beam.sdk.io.TextIO;
  10. import org.apache.beam.sdk.options.Default;
  11. import org.apache.beam.sdk.options.Description;
  12. import org.apache.beam.sdk.options.PipelineOptions;
  13. import org.apache.beam.sdk.options.PipelineOptionsFactory;
  14. import org.apache.beam.sdk.transforms.DoFn;
  15. import org.apache.beam.sdk.transforms.ParDo;
  16. import com.google.api.services.bigquery.model.TableFieldSchema;
  17. import com.google.api.services.bigquery.model.TableRow;
  18. import com.google.api.services.bigquery.model.TableSchema;
  19.  
  20.  
  21. public class NonStandardDelimiters {
  22.  
  23. public static interface MyOptions extends PipelineOptions {
  24. @Description("Output BigQuery table <PROJECT_ID>:<DATASET>.<TABLE>")
  25. String getOutput();
  26. void setOutput(String s);
  27.  
  28. @Description("Input file, gs://path/to/file")
  29. String getInput();
  30. void setInput(String s);
  31. }
  32.  
  33. @SuppressWarnings("serial")
  34. public static void main(String[] args) {
  35. MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
  36.  
  37. Pipeline p = Pipeline.create(options);
  38.  
  39. String file = options.getInput();
  40. String output = options.getOutput();
  41.  
  42. // Build the table schema for the output table.
  43. List<TableFieldSchema> fields = new ArrayList<>();
  44. fields.add(new TableFieldSchema().setName("data").setType("STRING"));
  45. TableSchema schema = new TableSchema().setFields(fields);
  46.  
  47. p
  48. .apply("GetMessages", TextIO.read().from(file))
  49. .apply("ExtractRows", ParDo.of(new DoFn<String, String>() {
  50. @ProcessElement
  51. public void processElement(ProcessContext c) {
  52. for (String line : c.element().split("\\\\x01\\\\n")) {
  53. if (!line.isEmpty()) {
  54. c.output(line);
  55. }
  56. }
  57. }
  58. }))
  59. .apply("ToBQRow", ParDo.of(new DoFn<String, TableRow>() {
  60. @ProcessElement
  61. public void processElement(ProcessContext c) throws Exception {
  62. TableRow row = new TableRow();
  63. row.set("data", c.element());
  64. c.output(row);
  65. }
  66. }))
  67. .apply(BigQueryIO.writeTableRows().to(output)
  68. .withSchema(schema)
  69. .withMethod(Method.FILE_LOADS)
  70. .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
  71. .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
  72.  
  73. p.run();
  74. }
  75. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement