Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.pipeline;
- import org.apache.beam.sdk.Pipeline;
- import org.apache.beam.sdk.io.FileIO;
- import org.apache.beam.sdk.options.*;
- import org.apache.beam.sdk.transforms.DoFn;
- import org.apache.commons.io.IOUtils;
- import org.apache.beam.sdk.transforms.ParDo;
- import org.apache.beam.sdk.values.KV;
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.math.BigInteger;
- import java.security.MessageDigest;
- import java.security.NoSuchAlgorithmException;
- public class BeamPipeline {
- public static void main(String[] args) {
- PipelineOptions options = PipelineOptionsFactory.create();
- Pipeline p = Pipeline.create(options);
- File dir = new File("../testdata/");
- for (File file : dir.listFiles()) {
- p
- .apply("ReadLines", FileIO.match())
- .apply("Read Files", FileIO.readMatches())
- .apply("Hash File",ParDo.of(new DoFn<FileIO.ReadableFile, KV<String, String>>() {
- @ProcessElement
- public void processElement(@Element File file, OutputReceiver<KV<String, String>> out) throws
- NoSuchAlgorithmException, IOException {
- // File -> Bytes
- byte[] byteFile = null;
- try(FileInputStream inputStream = new FileInputStream(file)) {
- byteFile = IOUtils.toByteArray(inputStream);
- }
- // SHA-256
- MessageDigest md = MessageDigest.getInstance("SHA-256");
- byte[] messageDigest = md.digest(byteFile);
- BigInteger no = new BigInteger(1, messageDigest);
- String hashtext = no.toString(16);
- while(hashtext.length() < 32) {
- hashtext = "0" + hashtext;
- }
- out.output(KV.of(file.getName(), hashtext));
- }
- }))
- .apply(FileIO.write());
- }
- p.run();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement