Guest User

Simple Stream Reader

a guest
Jun 6th, 2016
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 1.71 KB | None | 0 0
  1. package org.biplob.streamReader;
  2.  
  3. import java.io.BufferedReader;
  4. import java.io.FileInputStream;
  5. import java.io.IOException;
  6. import java.io.InputStreamReader;
  7. import java.util.Calendar;
  8. import java.util.Random;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.ScheduledExecutorService;
  11. import java.util.concurrent.TimeUnit;
  12.  
  13. import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
  14. import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
  15. import org.apache.flink.streaming.api.watermark.Watermark;
  16. import org.biplob.thesis.DataTypes.Point;
  17.  
  18. public class DataStreamGenerator implements ParallelSourceFunction<Point> {
  19.    
  20.     private String filePath;
  21.     private float streamSpeed; 
  22.     private volatile boolean running = true;
  23.  
  24.     private BufferedReader br;
  25.     private FileInputStream fis;
  26.    
  27.     public DataStreamGenerator(String filePath)
  28.     {
  29.         this(filePath, 1.0f);
  30.     }
  31.    
  32.     public DataStreamGenerator(String filePath, float streamSpeed)
  33.     {
  34.         this.filePath = filePath;
  35.         this.streamSpeed = streamSpeed;
  36.     }
  37.    
  38.     public void run(SourceContext<Point> ctx) throws Exception
  39.     {
  40.         //read dataset
  41.         String record;
  42.         long timestamp;
  43.         fis = new FileInputStream(filePath);
  44.         br = new BufferedReader(new InputStreamReader(fis));
  45.        
  46.         try
  47.         {
  48.             while((record = br.readLine())!= null && running == true)
  49.             {
  50.                 timestamp = System.nanoTime();
  51.                 RecordReader recordObject = RecordReader.parseRecord(record, timestamp);
  52.                 Point readPoint = new Point(recordObject.getItems());
  53.                 ctx.collect(readPoint);
  54.  
  55.             }
  56.         } catch (Exception e) {
  57.             e.printStackTrace();
  58.         }
  59.  
  60.     }
  61.  
  62.     @Override
  63.     public void cancel() {
  64.         running = false;
  65.        
  66.     }
  67.  
  68. }
Add Comment
Please, Sign In to add comment