Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package hadoopsters.spark.scala.monitoring.listeners
- import org.apache.spark.streaming.kafka010.OffsetRange
- import org.apache.spark.streaming.scheduler._
- import org.joda.time.DateTime
- /**
- * :: ExampleStreamingListener ::
- * A simple StreamingListener that accesses summary statistics across Spark Streaming batches; inherits from DeveloperAPI.
- *
- * @param exampleArg You can pass whatever you want to a listener!
- */
- class ExampleStreamingListener (exampleArg: String) extends StreamingListener {
- // ====================
- // onBatch_ Methods
- // ====================
- /**
- * This method executes when a Spark Streaming batch completes.
- *
- * @param batchCompleted Class having information on the completed batch
- */
- override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
- println("I was passed to the listener: " + exampleArg)
- // write performance metrics somewhere
- writeStatsSomewhere(batchCompleted)
- // write offsets (state) somewhere, and numRecords per topic
- processTopicInfo(batchCompleted)
- }
- /**
- * This method executes when a Spark Streaming batch is submitted to the scheduler for execution.
- *
- * @param batchSubmitted Class having information on the completed batch
- */
- override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
- }
- /**
- * This method executes when a Spark Streaming batch starts.
- *
- * @param batchStarted Class having information on the completed batch
- */
- override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
- }
- // ====================
- // onReceiver_ Methods
- // ====================
- /**
- * This method executes when a Spark Streaming receiver has started.
- *
- * @param receiverStarted Class having information on the receiver (e.g. errors, executor ids, etc)
- */
- override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
- }
- /**
- * This method executes when a Spark Streaming receiver encounters an error.
- *
- * @param receiverError Class having information on the receiver (e.g. errors, executor ids, etc)
- */
- override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
- }
- /**
- * This method executes when a Spark Streaming receiver stops working.
- *
- * @param receiverStopped Class having information on the receiver (e.g. errors, executor ids, etc)
- */
- override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {
- }
- // =======================================================================
- // Convenience Methods (for use in onBatch_ methods)
- // =======================================================================
- /**
- * Pulls, parses, and logs the key performance metrics of the Streaming app and logs them somewhere.
- * Processing Time: How many seconds needed to complete this batch (i.e. duration).
- * Scheduling Delay: How many seconds the start time of this bach was delayed.
- * Num Records: The total number of input records from a live stream consumed this batch.
- *
- * @param batch Class having information on the completed batch
- */
- def writeStatsSomewhere(batch: StreamingListenerBatchCompleted): Unit = {
- // Store the processing time for this batch in seconds
- val processingTime = if (batch.batchInfo.processingDelay.isDefined) {
- batch.batchInfo.processingDelay.get / 1000
- }
- else {
- 0
- }
- // Store the scheduling delay for this batch in seconds
- val schedulingDelay = if (batch.batchInfo.schedulingDelay.isDefined && batch.batchInfo.schedulingDelay.get > 0) {
- batch.batchInfo.schedulingDelay.get / 1000
- }
- else {
- 0
- }
- // Store the total record count for this batch
- val numRecords = batch.batchInfo.numRecords
- // do something with `processingTime`
- // do something with `schedulingDelay`
- // do something with `numRecords`
- }
- /**
- * A combination method that will process a topic in a batch.
- *
- * @param batch Class having information on the completed batch
- */
- def processTopicInfo(batch: StreamingListenerBatchCompleted): Unit = {
- // for each stream topic consumed this batch...
- batch.batchInfo.streamIdToInputInfo.foreach(topic => {
- writeTopicOffsetsSomewhere(topic)
- writeTopicCountSomewhere(topic)
- })
- }
- // =======================================================================
- // Topic Methods (designed for use inside of convenience methods)
- // =======================================================================
- /**
- * Takes a topic object and writes the max offset for each partition it contains this batch somewhere.
- *
- * @param topic A topic object within a Batch's StreamIdToInputInfo
- */
- def writeTopicOffsetsSomewhere(topic: Tuple2[Int, StreamInputInfo]): Unit = {
- // map offset info to OffsetRange objects
- val partitionOffsets = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]]
- // for every partition's range of offsets
- partitionOffsets.map(offsetRange => {
- // write the new starting offset for each partition in the topic to the state db
- var maxOffset = offsetRange.untilOffset - 1
- // do something with `offsetRange.topic`
- // do something with `offsetRange.partition`
- // do something with `offsetRange.count`
- // do something with `maxOffset`
- })
- }
- /**
- * Takes a topic object and writes the number of records for said topic this batch somewhere.
- *
- * @param topic A topic object within a Batch's StreamIdToInputInfo
- */
- def writeTopicCountSomewhere(topic: Tuple2[Int, StreamInputInfo]): Unit = {
- // store the individual record count for this topic
- val numRecords = topic._2.numRecords
- // store topicName
- val topicName = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]].head.topic
- // write record count for this topic this batch
- // do something with `topicName` and `numRecords`
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement