Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package your.project;
- import org.apache.spark.streaming.scheduler.*;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class SimpleListener implements StreamingListener {
- private static final Logger log = LoggerFactory.getLogger(SimpleListener.class);
- private long totalNumOfRecords = 0L;
- private long totalNumRecordsQueued = 0L;
- public SimpleListener() {}
- @Override
- public synchronized void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {
- long numNewRecords = batchSubmitted.batchInfo().numRecords();
- totalNumOfRecords += numNewRecords;
- totalNumRecordsQueued += numNewRecords;
- log.info(numNewRecords + " came with this batch, total number records is "
- + totalNumOfRecords + ", number of queued records is " + totalNumRecordsQueued);
- }
- @Override
- public synchronized void onBatchStarted(StreamingListenerBatchStarted batchStarted) {
- // Remove the records from queued batch as long as the batch is started.
- totalNumRecordsQueued -= batchStarted.batchInfo().numRecords();
- }
- @Override
- public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}
- @Override
- public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted operationCompleted) {}
- @Override
- public void onOutputOperationStarted(StreamingListenerOutputOperationStarted OperationStarted) {}
- @Override
- public void onReceiverError(StreamingListenerReceiverError receiverError) {}
- @Override
- public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}
- @Override
- public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement