Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import com.bouncex.ingestion.ProductFeedOptions;
- import com.google.cloud.bigtable.beam.CloudBigtableIO;
- import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
- import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
- import com.google.cloud.bigtable.data.v2.models.Row;
- import org.apache.beam.sdk.Pipeline;
- import org.apache.beam.sdk.io.BoundedSource;
- import org.apache.beam.sdk.io.FileSystems;
- import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
- import org.apache.beam.sdk.options.PipelineOptions;
- import org.apache.beam.sdk.options.PipelineOptionsFactory;
- import org.apache.beam.sdk.transforms.Create;
- import org.apache.beam.sdk.transforms.DoFn;
- import org.apache.beam.sdk.transforms.PTransform;
- import org.apache.beam.sdk.transforms.ParDo;
- import org.apache.beam.sdk.values.PCollection;
- import org.apache.beam.sdk.values.TypeDescriptor;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.Scan;
- import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
- class Scratch {
- private class Foo {
- private String data;
- public String getKey() {
- return data + "#" + data;
- }
- }
- private static class ToKey extends DoFn<Foo, String> {
- @ProcessElement
- public void processElement(ProcessContext ctx) {
- ctx.output(ctx.element().getKey());
- }
- }
- private static class ToScan extends DoFn<String, Scan> {
- @ProcessElement
- public void processElement(ProcessContext ctx) {
- ctx.output(new Scan(ctx.element().getBytes(), ctx.element().getBytes()));
- }
- }
- private static class Read extends PTransform<PCollection<Scan>, PCollection<Row>> {
- private CloudBigtableTableConfiguration config;
- private PipelineOptions options;
- public Read(CloudBigtableTableConfiguration config, PipelineOptions options) {
- this.config = config;
- this.options = options;
- }
- private static class ReadSource extends DoFn<BoundedSource<Result>, PCollection<Result>> {
- @ProcessElement
- public void processElement(ProcessContext ctx) {
- BoundedSource.BoundedReader<Result> reader = ctx.element().createReader(options);
- try {
- for (boolean available = reader.start(); available; available = reader.advance()) {
- Result item = reader.getCurrent();
- }
- } finally {
- reader.close();
- }
- }
- }
- @Override
- public PCollection<Row> expand(PCollection<Scan> input) {
- input.apply("GetSource", ParDo.of(new DoFn<Scan, BoundedSource<Result>>() {
- @ProcessElement
- public void processElement(ProcessContext ctx) {
- ctx.output(CloudBigtableIO.read(CloudBigtableScanConfiguration.fromConfig(config, ctx.element())));
- }
- }));
- }
- }
- public static void main(String[] args) {
- PipelineOptionsFactory.register(ProductFeedOptions.class);
- final PipelineOptions options = PipelineOptionsFactory
- .fromArgs(args)
- .withValidation()
- .as(PipelineOptions.class);
- FileSystems.setDefaultPipelineOptions(options);
- final Pipeline pipeline = Pipeline.create(options);
- CloudBigtableTableConfiguration config = new CloudBigtableTableConfiguration.Builder()
- .withProjectId("")
- .withInstanceId("")
- .withTableId("")
- .build();
- pipeline
- .apply("emptyfoo", Create.empty(new TypeDescriptor<Foo>() {}))
- .apply("toKeys", ParDo.of(new ToKey()))
- .apply("toScan", ParDo.of(new ToScan()))
- .apply("Read", new Read(config, options));
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement