Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package cvi6.demonstrations;
- import io.reactivex.Observable;
- import io.reactivex.ObservableEmitter;
- import io.reactivex.ObservableOnSubscribe;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.nio.file.Files;
- import java.nio.file.Path;
- import java.nio.file.Paths;
- public class RxFileReader implements ObservableOnSubscribe<String> {
- protected Path filePath;
- public RxFileReader(Path filePath) {
- this.filePath = filePath;
- }
- public void subscribe(ObservableEmitter<String> emitter) {
- try {
- // prepare buffered reader for file
- BufferedReader br;
- br = Files.newBufferedReader(filePath);
- // read line by line
- String line = "";
- while((line = br.readLine()) != null) {
- // emit each line
- emitter.onNext(line);
- }
- } catch (IOException e) {
- // emit error
- emitter.onError(e);
- }
- // if all successful, emit on complete
- emitter.onComplete();
- }
- public static void main(String[] args) {
- RxFileReader reader = new RxFileReader(Paths.get("data.txt"));
- Observable.create(reader)
- .takeWhile(line -> !line.contains("=="))
- .forEach(System.out::println);
- System.out.println("==========");
- // this will open the file again and won't continue in reading
- Observable.create(reader)
- .takeWhile(line -> !line.contains("=="))
- .forEach(System.out::println);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement