Advertisement
Guest User

Untitled

a guest
Jul 20th, 2018
369
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.18 KB | None | 0 0
  1. package whatever;
  2.  
  3. import com.google.api.services.bigquery.model.TableRow;
  4. import org.apache.beam.sdk.Pipeline;
  5. import org.apache.beam.sdk.io.Compression;
  6. import org.apache.beam.sdk.io.TextIO;
  7. import org.apache.beam.sdk.options.PipelineOptionsFactory;
  8. import org.apache.beam.sdk.transforms.DoFn;
  9. import org.apache.beam.sdk.transforms.ParDo;
  10. import org.apache.beam.sdk.transforms.Watch;
  11. import org.joda.time.Duration;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14.  
  15. public class Whatever {
  16.     private static final Logger log = LoggerFactory.getLogger(LeakExtraction.class);
  17.     public static void main(String[] args) {
  18.  
  19.         PipelineOptionsFactory.Builder options = PipelineOptionsFactory.fromArgs(args).withValidation();
  20.         Pipeline pipeline = Pipeline.create(options.create());
  21.  
  22.         log.info("DFP data transfer from GS to BQ has started.");
  23.  
  24.         pipeline.apply("ReadFromStorage", TextIO.read()
  25.                 .from("gs://whatever-test/my-folder/*.gz")
  26.                 .withCompression(Compression.GZIP)
  27.                 .watchForNewFiles(
  28.                         // Check for new files every 30 seconds
  29.                         Duration.standardSeconds(30),
  30.                         // Never stop checking for new files
  31.                         Watch.Growth.never()))
  32.                 .apply("TransformToTableRow", ParDo.of(new TableRowConverterFn()));
  33.  
  34.         pipeline.run().waitUntilFinish();
  35.         log.info("DFP data transfer from GS to BQ is finished in {} seconds.");
  36.     }
  37.  
  38.     /**
  39.      * Creates a TableRow from a CSV line
  40.      */
  41.     private static class TableRowConverterFn extends DoFn<String, TableRow> {
  42.         @ProcessElement
  43.         public void processElement(ProcessContext c) throws Exception {
  44.             String[] split = c.element().split(",");
  45.  
  46.             //Ignore the header line
  47.             //Since this is going to be run in parallel, we can't guarantee that the first line passed to this method will be the header
  48.             if (split[0].equals("Time")) {
  49.                 log.info("Skipped header");
  50.                 return;
  51.             }
  52.             TableRow row = new TableRow();
  53.             c.output(row);
  54.         }
  55.     }
  56. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement