Guest User

Untitled

a guest
Apr 21st, 2019
96
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package com.pipeline;
  2.  
  3. import org.apache.beam.sdk.Pipeline;
  4. import org.apache.beam.sdk.io.FileIO;
  5. import org.apache.beam.sdk.options.*;
  6. import org.apache.beam.sdk.transforms.DoFn;
  7. import org.apache.commons.io.IOUtils;
  8. import org.apache.beam.sdk.transforms.ParDo;
  9. import org.apache.beam.sdk.values.KV;
  10. import java.io.File;
  11. import java.io.FileInputStream;
  12. import java.io.IOException;
  13. import java.math.BigInteger;
  14. import java.security.MessageDigest;
  15. import java.security.NoSuchAlgorithmException;
  16.  
  17. public class BeamPipeline {
  18.  
  19. public static void main(String[] args) {
  20.  
  21. PipelineOptions options = PipelineOptionsFactory.create();
  22. Pipeline p = Pipeline.create(options);
  23.  
  24. File dir = new File("../testdata/");
  25. for (File file : dir.listFiles()) {
  26. p
  27. .apply("ReadLines", FileIO.match())
  28. .apply("Read Files", FileIO.readMatches())
  29. .apply("Hash File",ParDo.of(new DoFn<FileIO.ReadableFile, KV<String, String>>() {
  30. @ProcessElement
  31. public void processElement(@Element File file, OutputReceiver<KV<String, String>> out) throws
  32. NoSuchAlgorithmException, IOException {
  33. // File -> Bytes
  34. byte[] byteFile = null;
  35. try(FileInputStream inputStream = new FileInputStream(file)) {
  36. byteFile = IOUtils.toByteArray(inputStream);
  37. }
  38.  
  39. // SHA-256
  40. MessageDigest md = MessageDigest.getInstance("SHA-256");
  41. byte[] messageDigest = md.digest(byteFile);
  42. BigInteger no = new BigInteger(1, messageDigest);
  43. String hashtext = no.toString(16);
  44. while(hashtext.length() < 32) {
  45. hashtext = "0" + hashtext;
  46. }
  47. out.output(KV.of(file.getName(), hashtext));
  48. }
  49. }))
  50. .apply(FileIO.write());
  51. }
  52. p.run();
  53. }
  54. }
RAW Paste Data