Guest User

Stream Source similar to TaxiRide

a guest
Jun 6th, 2016
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.37 KB | None | 0 0
  1. package org.biplob.thesis.streamReader;
  2.  
  3. import java.io.BufferedReader;
  4. import java.io.FileInputStream;
  5. import java.io.IOException;
  6. import java.io.InputStreamReader;
  7. import java.util.Calendar;
  8. import java.util.Random;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.ScheduledExecutorService;
  11. import java.util.concurrent.TimeUnit;
  12.  
  13. import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
  14. import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
  15. import org.apache.flink.streaming.api.watermark.Watermark;
  16. import org.biplob.thesis.DataTypes.Point;
  17.  
  18. public class DataStreamGenerator implements ParallelSourceFunction<Point> {
  19.    
  20.     private String filePath;
  21.     private float streamSpeed;
  22.    
  23.     private final int watermarkDelayMSecs;
  24.    
  25.     private volatile boolean running = true;
  26.     private static final int maxDelay = 6000;
  27.     private static final int baseInterval = 5000;
  28.    
  29.     private BufferedReader br;
  30.     private FileInputStream fis;
  31.    
  32.     public DataStreamGenerator(String filePath)
  33.     {
  34.         this(filePath, 1.0f);
  35.     }
  36.    
  37.     public DataStreamGenerator(String filePath, float streamSpeed)
  38.     {
  39.         this.filePath = filePath;
  40.         this.streamSpeed = streamSpeed;
  41.         this.watermarkDelayMSecs = 10000;
  42.     }
  43.    
  44.     public void run(SourceContext<Point> ctx) throws Exception
  45.     {
  46.         //read dataset
  47.         String record;
  48.         long timestamp;
  49.         fis = new FileInputStream(filePath);
  50.         br = new BufferedReader(new InputStreamReader(fis));
  51.  
  52.        
  53.         generateOrderedStream(ctx);
  54.        
  55.         this.fis.close();
  56.         this.fis = null;
  57.         this.br.close();
  58.         this.br = null;
  59.  
  60.     }
  61.    
  62.     private void generateOrderedStream(SourceContext<Point> sourceContext) throws Exception {
  63.  
  64.         long servingStartTime = Calendar.getInstance().getTimeInMillis();
  65.         long dataStartTime;
  66.  
  67.         long nextWatermark;
  68.         long nextWatermarkServingTime;
  69.  
  70.         String line;
  71.         if (br.ready() && (line = br.readLine()) != null) {
  72.             // read first ride
  73.             RecordReader recordObject = RecordReader.parseRecord(line, Calendar.getInstance().getTimeInMillis());
  74.             Point readPoint = new Point(recordObject.getItems(),recordObject.timestamp);
  75.             // extract starting timestamp
  76.             dataStartTime = readPoint.timestamp;
  77.             // schedule next watermark
  78.             nextWatermark = dataStartTime + (watermarkDelayMSecs);
  79.             nextWatermarkServingTime = toServingTime(servingStartTime, dataStartTime, nextWatermark);
  80.             // emit first ride
  81.             sourceContext.collectWithTimestamp(readPoint, readPoint.timestamp);
  82.         } else {
  83.             return;
  84.         }
  85.  
  86.         // emit all subsequent rides proportial to their timestamp and servingSpeed
  87.         while (br.ready() && (line = br.readLine()) != null) {
  88.  
  89.             RecordReader recordObject = RecordReader.parseRecord(line, Calendar.getInstance().getTimeInMillis());
  90.             Point readPoint = new Point(recordObject.getItems(),recordObject.timestamp);
  91. ;
  92.             long eventTime = readPoint.timestamp;
  93.  
  94.             long now = Calendar.getInstance().getTimeInMillis();
  95.             long eventServingTime = toServingTime(servingStartTime, dataStartTime, eventTime);
  96.             long eventWait = eventServingTime - now;
  97.             long watermarkWait = nextWatermarkServingTime - now;
  98.  
  99.             if(eventWait < watermarkWait) {
  100.                 Thread.sleep(eventWait > 0 ? eventWait : 0);
  101.             }
  102.             else if(eventWait > watermarkWait) {
  103.                 Thread.sleep(watermarkWait > 0 ? watermarkWait : 0);
  104.                 sourceContext.emitWatermark(new Watermark(nextWatermark));
  105.                 nextWatermark = nextWatermark + (watermarkDelayMSecs);
  106.                 nextWatermarkServingTime = toServingTime(servingStartTime, dataStartTime, nextWatermark);
  107.                 long remainWait = eventWait - watermarkWait;
  108.                 Thread.sleep(remainWait > 0 ? remainWait : 0);
  109.             }
  110.             else if(eventWait == watermarkWait) {
  111.                 Thread.sleep(watermarkWait > 0 ? watermarkWait : 0);
  112.                 // -1 to ensure that no following events have the same timestamp
  113.                 sourceContext.emitWatermark(new Watermark(nextWatermark - 1));
  114.                 nextWatermark = nextWatermark + (watermarkDelayMSecs);
  115.                 nextWatermarkServingTime = toServingTime(servingStartTime, dataStartTime, nextWatermark);
  116.             }
  117.  
  118.             sourceContext.collectWithTimestamp(readPoint, readPoint.timestamp);
  119.         }
  120.  
  121.     }
  122.    
  123.     public long toServingTime(long servingStartTime, long dataStartTime, long eventTime) {
  124.         long dataDiff = eventTime - dataStartTime;
  125.         return servingStartTime + (long)(dataDiff / this.streamSpeed);
  126.     }
  127.  
  128.  
  129.     @Override
  130.     public void cancel() {
  131.         running = false;
  132.        
  133.     }
  134.  
  135. }
Add Comment
Please, Sign In to add comment