Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.biplob.thesis.streamReader;
- import java.io.BufferedReader;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.util.Calendar;
- import java.util.Random;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
- import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
- import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
- import org.apache.flink.streaming.api.watermark.Watermark;
- import org.biplob.thesis.DataTypes.Point;
- public class DataStreamGenerator implements ParallelSourceFunction<Point> {
- private String filePath;
- private float streamSpeed;
- private final int watermarkDelayMSecs;
- private volatile boolean running = true;
- private static final int maxDelay = 6000;
- private static final int baseInterval = 5000;
- private BufferedReader br;
- private FileInputStream fis;
- public DataStreamGenerator(String filePath)
- {
- this(filePath, 1.0f);
- }
- public DataStreamGenerator(String filePath, float streamSpeed)
- {
- this.filePath = filePath;
- this.streamSpeed = streamSpeed;
- this.watermarkDelayMSecs = 10000;
- }
- public void run(SourceContext<Point> ctx) throws Exception
- {
- //read dataset
- String record;
- long timestamp;
- fis = new FileInputStream(filePath);
- br = new BufferedReader(new InputStreamReader(fis));
- generateOrderedStream(ctx);
- this.fis.close();
- this.fis = null;
- this.br.close();
- this.br = null;
- }
- private void generateOrderedStream(SourceContext<Point> sourceContext) throws Exception {
- long servingStartTime = Calendar.getInstance().getTimeInMillis();
- long dataStartTime;
- long nextWatermark;
- long nextWatermarkServingTime;
- String line;
- if (br.ready() && (line = br.readLine()) != null) {
- // read first ride
- RecordReader recordObject = RecordReader.parseRecord(line, Calendar.getInstance().getTimeInMillis());
- Point readPoint = new Point(recordObject.getItems(),recordObject.timestamp);
- // extract starting timestamp
- dataStartTime = readPoint.timestamp;
- // schedule next watermark
- nextWatermark = dataStartTime + (watermarkDelayMSecs);
- nextWatermarkServingTime = toServingTime(servingStartTime, dataStartTime, nextWatermark);
- // emit first ride
- sourceContext.collectWithTimestamp(readPoint, readPoint.timestamp);
- } else {
- return;
- }
- // emit all subsequent rides proportial to their timestamp and servingSpeed
- while (br.ready() && (line = br.readLine()) != null) {
- RecordReader recordObject = RecordReader.parseRecord(line, Calendar.getInstance().getTimeInMillis());
- Point readPoint = new Point(recordObject.getItems(),recordObject.timestamp);
- ;
- long eventTime = readPoint.timestamp;
- long now = Calendar.getInstance().getTimeInMillis();
- long eventServingTime = toServingTime(servingStartTime, dataStartTime, eventTime);
- long eventWait = eventServingTime - now;
- long watermarkWait = nextWatermarkServingTime - now;
- if(eventWait < watermarkWait) {
- Thread.sleep(eventWait > 0 ? eventWait : 0);
- }
- else if(eventWait > watermarkWait) {
- Thread.sleep(watermarkWait > 0 ? watermarkWait : 0);
- sourceContext.emitWatermark(new Watermark(nextWatermark));
- nextWatermark = nextWatermark + (watermarkDelayMSecs);
- nextWatermarkServingTime = toServingTime(servingStartTime, dataStartTime, nextWatermark);
- long remainWait = eventWait - watermarkWait;
- Thread.sleep(remainWait > 0 ? remainWait : 0);
- }
- else if(eventWait == watermarkWait) {
- Thread.sleep(watermarkWait > 0 ? watermarkWait : 0);
- // -1 to ensure that no following events have the same timestamp
- sourceContext.emitWatermark(new Watermark(nextWatermark - 1));
- nextWatermark = nextWatermark + (watermarkDelayMSecs);
- nextWatermarkServingTime = toServingTime(servingStartTime, dataStartTime, nextWatermark);
- }
- sourceContext.collectWithTimestamp(readPoint, readPoint.timestamp);
- }
- }
- public long toServingTime(long servingStartTime, long dataStartTime, long eventTime) {
- long dataDiff = eventTime - dataStartTime;
- return servingStartTime + (long)(dataDiff / this.streamSpeed);
- }
- @Override
- public void cancel() {
- running = false;
- }
- }
Add Comment
Please, Sign In to add comment