Advertisement
Guest User

Untitled

a guest
Jul 22nd, 2019
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.82 KB | None | 0 0
  1. import com.bouncex.ingestion.ProductFeedOptions;
  2. import com.google.cloud.bigtable.beam.CloudBigtableIO;
  3. import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
  4. import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
  5. import com.google.cloud.bigtable.data.v2.models.Row;
  6. import org.apache.beam.sdk.Pipeline;
  7. import org.apache.beam.sdk.io.BoundedSource;
  8. import org.apache.beam.sdk.io.FileSystems;
  9. import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
  10. import org.apache.beam.sdk.options.PipelineOptions;
  11. import org.apache.beam.sdk.options.PipelineOptionsFactory;
  12. import org.apache.beam.sdk.transforms.Create;
  13. import org.apache.beam.sdk.transforms.DoFn;
  14. import org.apache.beam.sdk.transforms.PTransform;
  15. import org.apache.beam.sdk.transforms.ParDo;
  16. import org.apache.beam.sdk.values.PCollection;
  17. import org.apache.beam.sdk.values.TypeDescriptor;
  18. import org.apache.hadoop.hbase.client.Result;
  19. import org.apache.hadoop.hbase.client.Scan;
  20. import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
  21.  
  22. class Scratch {
  23. private class Foo {
  24. private String data;
  25. public String getKey() {
  26. return data + "#" + data;
  27. }
  28. }
  29.  
  30. private static class ToKey extends DoFn<Foo, String> {
  31. @ProcessElement
  32. public void processElement(ProcessContext ctx) {
  33. ctx.output(ctx.element().getKey());
  34. }
  35. }
  36.  
  37. private static class ToScan extends DoFn<String, Scan> {
  38. @ProcessElement
  39. public void processElement(ProcessContext ctx) {
  40. ctx.output(new Scan(ctx.element().getBytes(), ctx.element().getBytes()));
  41. }
  42. }
  43.  
  44. private static class Read extends PTransform<PCollection<Scan>, PCollection<Row>> {
  45. private CloudBigtableTableConfiguration config;
  46. private PipelineOptions options;
  47.  
  48. public Read(CloudBigtableTableConfiguration config, PipelineOptions options) {
  49. this.config = config;
  50. this.options = options;
  51. }
  52.  
  53. private static class ReadSource extends DoFn<BoundedSource<Result>, PCollection<Result>> {
  54. @ProcessElement
  55. public void processElement(ProcessContext ctx) {
  56. BoundedSource.BoundedReader<Result> reader = ctx.element().createReader(options);
  57. try {
  58. for (boolean available = reader.start(); available; available = reader.advance()) {
  59. Result item = reader.getCurrent();
  60. }
  61. } finally {
  62. reader.close();
  63. }
  64. }
  65. }
  66.  
  67. @Override
  68. public PCollection<Row> expand(PCollection<Scan> input) {
  69. input.apply("GetSource", ParDo.of(new DoFn<Scan, BoundedSource<Result>>() {
  70. @ProcessElement
  71. public void processElement(ProcessContext ctx) {
  72. ctx.output(CloudBigtableIO.read(CloudBigtableScanConfiguration.fromConfig(config, ctx.element())));
  73. }
  74. }));
  75. }
  76. }
  77.  
  78. public static void main(String[] args) {
  79. PipelineOptionsFactory.register(ProductFeedOptions.class);
  80. final PipelineOptions options = PipelineOptionsFactory
  81. .fromArgs(args)
  82. .withValidation()
  83. .as(PipelineOptions.class);
  84.  
  85. FileSystems.setDefaultPipelineOptions(options);
  86.  
  87. final Pipeline pipeline = Pipeline.create(options);
  88.  
  89. CloudBigtableTableConfiguration config = new CloudBigtableTableConfiguration.Builder()
  90. .withProjectId("")
  91. .withInstanceId("")
  92. .withTableId("")
  93. .build();
  94.  
  95. pipeline
  96. .apply("emptyfoo", Create.empty(new TypeDescriptor<Foo>() {}))
  97. .apply("toKeys", ParDo.of(new ToKey()))
  98. .apply("toScan", ParDo.of(new ToScan()))
  99. .apply("Read", new Read(config, options));
  100. }
  101. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement