Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- String ftpUser = URLEncoder.encode(username, "UTF-8");
- String ftpPass = URLEncoder.encode(password, "UTF-8");
- URL url = new URL("ftp://" +
- URLEncoder.encode(username, "UTF-8") + ":" +
- URLEncoder.encode(password, "UTF-8") + "@" +
- host + "/" + remoteFile );
- System.out.println(url.toString());
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- TextInputFormat format = new TextInputFormat(
- new org.apache.flink.core.fs.Path(url.toString()));
- DataStream<String> inputStream = env.readFile(format, url.toString(), FileProcessingMode.PROCESS_CONTINUOUSLY, 1);
- DataStream parsedStream = inputStream
- .map(new MapFunction<String, Content>() {
- @Override
- public Content map(String s) throws Exception {
- String[] cells = s.split("\|");
- Date chargeTimeStamp = sdf.get().parse(cells[23]);
- Float chargeAmount1 = Float.valueOf(cells[43])
- return new Content(chargeTimeStamp,chargeAmount1);
- }
- })
- ;
Add Comment
Please, Sign In to add comment