Guest User

Untitled

a guest
Feb 25th, 2018
95
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.22 KB | None | 0 0
  1. String ftpUser = URLEncoder.encode(username, "UTF-8");
  2. String ftpPass = URLEncoder.encode(password, "UTF-8");
  3. URL url = new URL("ftp://" +
  4. URLEncoder.encode(username, "UTF-8") + ":" +
  5. URLEncoder.encode(password, "UTF-8") + "@" +
  6. host + "/" + remoteFile );
  7. System.out.println(url.toString());
  8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. TextInputFormat format = new TextInputFormat(
  10. new org.apache.flink.core.fs.Path(url.toString()));
  11. DataStream<String> inputStream = env.readFile(format, url.toString(), FileProcessingMode.PROCESS_CONTINUOUSLY, 1);
  12.  
  13. DataStream parsedStream = inputStream
  14. .map(new MapFunction<String, Content>() {
  15. @Override
  16. public Content map(String s) throws Exception {
  17.  
  18. String[] cells = s.split("\|");
  19. Date chargeTimeStamp = sdf.get().parse(cells[23]);
  20.  
  21. Float chargeAmount1 = Float.valueOf(cells[43])
  22. return new Content(chargeTimeStamp,chargeAmount1);
  23. }
  24. })
  25. ;
Add Comment
Please, Sign In to add comment