Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package whatever;
- import com.google.api.services.bigquery.model.TableRow;
- import org.apache.beam.sdk.Pipeline;
- import org.apache.beam.sdk.io.Compression;
- import org.apache.beam.sdk.io.TextIO;
- import org.apache.beam.sdk.options.PipelineOptionsFactory;
- import org.apache.beam.sdk.transforms.DoFn;
- import org.apache.beam.sdk.transforms.ParDo;
- import org.apache.beam.sdk.transforms.Watch;
- import org.joda.time.Duration;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class Whatever {
- private static final Logger log = LoggerFactory.getLogger(LeakExtraction.class);
- public static void main(String[] args) {
- PipelineOptionsFactory.Builder options = PipelineOptionsFactory.fromArgs(args).withValidation();
- Pipeline pipeline = Pipeline.create(options.create());
- log.info("DFP data transfer from GS to BQ has started.");
- pipeline.apply("ReadFromStorage", TextIO.read()
- .from("gs://whatever-test/my-folder/*.gz")
- .withCompression(Compression.GZIP)
- .watchForNewFiles(
- // Check for new files every 30 seconds
- Duration.standardSeconds(30),
- // Never stop checking for new files
- Watch.Growth.never()))
- .apply("TransformToTableRow", ParDo.of(new TableRowConverterFn()));
- pipeline.run().waitUntilFinish();
- log.info("DFP data transfer from GS to BQ is finished in {} seconds.");
- }
- /**
- * Creates a TableRow from a CSV line
- */
- private static class TableRowConverterFn extends DoFn<String, TableRow> {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- String[] split = c.element().split(",");
- //Ignore the header line
- //Since this is going to be run in parallel, we can't guarantee that the first line passed to this method will be the header
- if (split[0].equals("Time")) {
- log.info("Skipped header");
- return;
- }
- TableRow row = new TableRow();
- c.output(row);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement