Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.biplob.streamReader;
- import java.io.BufferedReader;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.util.Calendar;
- import java.util.Random;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
- import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
- import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
- import org.apache.flink.streaming.api.watermark.Watermark;
- import org.biplob.thesis.DataTypes.Point;
- public class DataStreamGenerator implements ParallelSourceFunction<Point> {
- private String filePath;
- private float streamSpeed;
- private volatile boolean running = true;
- private BufferedReader br;
- private FileInputStream fis;
- public DataStreamGenerator(String filePath)
- {
- this(filePath, 1.0f);
- }
- public DataStreamGenerator(String filePath, float streamSpeed)
- {
- this.filePath = filePath;
- this.streamSpeed = streamSpeed;
- }
- public void run(SourceContext<Point> ctx) throws Exception
- {
- //read dataset
- String record;
- long timestamp;
- fis = new FileInputStream(filePath);
- br = new BufferedReader(new InputStreamReader(fis));
- try
- {
- while((record = br.readLine())!= null && running == true)
- {
- timestamp = System.nanoTime();
- RecordReader recordObject = RecordReader.parseRecord(record, timestamp);
- Point readPoint = new Point(recordObject.getItems());
- ctx.collect(readPoint);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- @Override
- public void cancel() {
- running = false;
- }
- }
Add Comment
Please, Sign In to add comment