Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.dataflow.samples;
- import java.time.Instant;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.beam.sdk.Pipeline;
- import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
- import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
- import org.apache.beam.sdk.io.TextIO;
- import org.apache.beam.sdk.options.Default;
- import org.apache.beam.sdk.options.Description;
- import org.apache.beam.sdk.options.PipelineOptions;
- import org.apache.beam.sdk.options.PipelineOptionsFactory;
- import org.apache.beam.sdk.transforms.DoFn;
- import org.apache.beam.sdk.transforms.ParDo;
- import com.google.api.services.bigquery.model.TableFieldSchema;
- import com.google.api.services.bigquery.model.TableRow;
- import com.google.api.services.bigquery.model.TableSchema;
- public class NonStandardDelimiters {
- public static interface MyOptions extends PipelineOptions {
- @Description("Output BigQuery table <PROJECT_ID>:<DATASET>.<TABLE>")
- String getOutput();
- void setOutput(String s);
- @Description("Input file, gs://path/to/file")
- String getInput();
- void setInput(String s);
- }
- @SuppressWarnings("serial")
- public static void main(String[] args) {
- MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
- Pipeline p = Pipeline.create(options);
- String file = options.getInput();
- String output = options.getOutput();
- // Build the table schema for the output table.
- List<TableFieldSchema> fields = new ArrayList<>();
- fields.add(new TableFieldSchema().setName("data").setType("STRING"));
- TableSchema schema = new TableSchema().setFields(fields);
- p
- .apply("GetMessages", TextIO.read().from(file))
- .apply("ExtractRows", ParDo.of(new DoFn<String, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (String line : c.element().split("\\\\x01\\\\n")) {
- if (!line.isEmpty()) {
- c.output(line);
- }
- }
- }
- }))
- .apply("ToBQRow", ParDo.of(new DoFn<String, TableRow>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- TableRow row = new TableRow();
- row.set("data", c.element());
- c.output(row);
- }
- }))
- .apply(BigQueryIO.writeTableRows().to(output)
- .withSchema(schema)
- .withMethod(Method.FILE_LOADS)
- .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
- .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
- p.run();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement