Guest User

Untitled

a guest
Dec 15th, 2018
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.51 KB | None | 0 0
  1. package com.sandboxws;
  2.  
  3. import java.util.Arrays;
  4.  
  5. import org.apache.beam.sdk.Pipeline;
  6. import org.apache.beam.sdk.io.TextIO;
  7. import org.apache.beam.sdk.transforms.Count;
  8. import org.apache.beam.sdk.transforms.FlatMapElements;
  9. import org.apache.beam.sdk.transforms.MapElements;
  10. import org.apache.beam.sdk.values.KV;
  11. import org.apache.beam.sdk.values.PCollection;
  12. import org.apache.beam.sdk.values.TypeDescriptors;
  13.  
  14. public class BeamBatchPipeline {
  15. public static void main(String[] args) {
  16. Pipeline pipeline = Pipeline.create();
  17.  
  18. // Step 1 - Read CSV file.
  19. PCollection<String> csvRows = pipeline.apply("Read from CSV",
  20. TextIO.read().from("./reviews.csv"));
  21.  
  22. // Step 2 - Extract ratings and count them.
  23. PCollection<KV<String, Long>> ratingsCounts = csvRows
  24. .apply("Extract Ratings",
  25. FlatMapElements.into(TypeDescriptors.strings())
  26. .via(csvRow -> Arrays.asList(csvRow.split(",")[1])))
  27. .apply("Count Ratings", Count.<String>perElement());
  28.  
  29. // Step 3 - Write results to CSV
  30. ratingsCounts
  31. .apply("FormatResults", MapElements.into(TypeDescriptors.strings())
  32. .via((KV<String, Long> ratingsCount) -> ratingsCount.getKey() + " " + ratingsCount.getValue()))
  33. .apply(TextIO.write().to("./ratings_results").withSuffix(".csv"));
  34.  
  35. // Run the pipeline and wait till it finishes before exiting
  36. pipeline.run().waitUntilFinish();
  37. }
  38. }
Add Comment
Please, Sign In to add comment