SHARE
TWEET

Untitled

a guest Jul 21st, 2019 43 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package your.project;
  2.  
  3. import org.apache.spark.streaming.scheduler.*;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6.  
  7. public class SimpleListener implements StreamingListener {
  8.     private static final Logger log = LoggerFactory.getLogger(SimpleListener.class);
  9.     private long totalNumOfRecords = 0L;
  10.     private long totalNumRecordsQueued = 0L;
  11.  
  12.     public SimpleListener() {}
  13.  
  14.     @Override
  15.     public synchronized void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {
  16.         long numNewRecords = batchSubmitted.batchInfo().numRecords();
  17.         totalNumOfRecords += numNewRecords;
  18.         totalNumRecordsQueued += numNewRecords;
  19.         log.info(numNewRecords + " came with this batch, total number records is "
  20.                 + totalNumOfRecords + ", number of queued records is " + totalNumRecordsQueued);
  21.     }
  22.  
  23.     @Override
  24.     public synchronized void onBatchStarted(StreamingListenerBatchStarted batchStarted) {
  25.         // Remove the records from queued batch as long as the batch is started.
  26.         totalNumRecordsQueued -= batchStarted.batchInfo().numRecords();
  27.     }
  28.  
  29.     @Override
  30.     public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}
  31.  
  32.     @Override
  33.     public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted operationCompleted) {}
  34.  
  35.     @Override
  36.     public void onOutputOperationStarted(StreamingListenerOutputOperationStarted OperationStarted) {}
  37.  
  38.     @Override
  39.     public void onReceiverError(StreamingListenerReceiverError receiverError) {}
  40.  
  41.     @Override
  42.     public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}
  43.  
  44.     @Override
  45.     public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}
  46. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top