Advertisement
Guest User

Untitled

a guest
May 21st, 2019
65
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.99 KB | None | 0 0
  1. package hadoopsters.spark.scala.monitoring.listeners
  2.  
  3. import org.apache.spark.streaming.kafka010.OffsetRange
  4. import org.apache.spark.streaming.scheduler._
  5. import org.joda.time.DateTime
  6.  
  7. /**
  8. * :: ExampleStreamingListener ::
  9. * A simple StreamingListener that accesses summary statistics across Spark Streaming batches; inherits from DeveloperAPI.
  10. *
  11. * @param exampleArg You can pass whatever you want to a listener!
  12. */
  13. class ExampleStreamingListener (exampleArg: String) extends StreamingListener {
  14.  
  15. // ====================
  16. // onBatch_ Methods
  17. // ====================
  18.  
  19. /**
  20. * This method executes when a Spark Streaming batch completes.
  21. *
  22. * @param batchCompleted Class having information on the completed batch
  23. */
  24. override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
  25.  
  26. println("I was passed to the listener: " + exampleArg)
  27.  
  28. // write performance metrics somewhere
  29. writeStatsSomewhere(batchCompleted)
  30.  
  31. // write offsets (state) somewhere, and numRecords per topic
  32. processTopicInfo(batchCompleted)
  33. }
  34.  
  35. /**
  36. * This method executes when a Spark Streaming batch is submitted to the scheduler for execution.
  37. *
  38. * @param batchSubmitted Class having information on the completed batch
  39. */
  40. override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
  41. }
  42.  
  43. /**
  44. * This method executes when a Spark Streaming batch starts.
  45. *
  46. * @param batchStarted Class having information on the completed batch
  47. */
  48. override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
  49.  
  50. }
  51.  
  52. // ====================
  53. // onReceiver_ Methods
  54. // ====================
  55.  
  56. /**
  57. * This method executes when a Spark Streaming receiver has started.
  58. *
  59. * @param receiverStarted Class having information on the receiver (e.g. errors, executor ids, etc)
  60. */
  61. override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
  62. }
  63.  
  64. /**
  65. * This method executes when a Spark Streaming receiver encounters an error.
  66. *
  67. * @param receiverError Class having information on the receiver (e.g. errors, executor ids, etc)
  68. */
  69. override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
  70. }
  71.  
  72. /**
  73. * This method executes when a Spark Streaming receiver stops working.
  74. *
  75. * @param receiverStopped Class having information on the receiver (e.g. errors, executor ids, etc)
  76. */
  77. override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {
  78. }
  79.  
  80. // =======================================================================
  81. // Convenience Methods (for use in onBatch_ methods)
  82. // =======================================================================
  83.  
  84. /**
  85. * Pulls, parses, and logs the key performance metrics of the Streaming app and logs them somewhere.
  86. * Processing Time: How many seconds needed to complete this batch (i.e. duration).
  87. * Scheduling Delay: How many seconds the start time of this bach was delayed.
  88. * Num Records: The total number of input records from a live stream consumed this batch.
  89. *
  90. * @param batch Class having information on the completed batch
  91. */
  92. def writeStatsSomewhere(batch: StreamingListenerBatchCompleted): Unit = {
  93.  
  94. // Store the processing time for this batch in seconds
  95. val processingTime = if (batch.batchInfo.processingDelay.isDefined) {
  96. batch.batchInfo.processingDelay.get / 1000
  97. }
  98. else {
  99. 0
  100. }
  101.  
  102. // Store the scheduling delay for this batch in seconds
  103. val schedulingDelay = if (batch.batchInfo.schedulingDelay.isDefined && batch.batchInfo.schedulingDelay.get > 0) {
  104. batch.batchInfo.schedulingDelay.get / 1000
  105. }
  106. else {
  107. 0
  108. }
  109.  
  110. // Store the total record count for this batch
  111. val numRecords = batch.batchInfo.numRecords
  112.  
  113. // do something with `processingTime`
  114. // do something with `schedulingDelay`
  115. // do something with `numRecords`
  116.  
  117. }
  118.  
  119. /**
  120. * A combination method that will process a topic in a batch.
  121. *
  122. * @param batch Class having information on the completed batch
  123. */
  124. def processTopicInfo(batch: StreamingListenerBatchCompleted): Unit = {
  125.  
  126. // for each stream topic consumed this batch...
  127. batch.batchInfo.streamIdToInputInfo.foreach(topic => {
  128.  
  129. writeTopicOffsetsSomewhere(topic)
  130. writeTopicCountSomewhere(topic)
  131.  
  132. })
  133. }
  134.  
  135. // =======================================================================
  136. // Topic Methods (designed for use inside of convenience methods)
  137. // =======================================================================
  138.  
  139. /**
  140. * Takes a topic object and writes the max offset for each partition it contains this batch somewhere.
  141. *
  142. * @param topic A topic object within a Batch's StreamIdToInputInfo
  143. */
  144. def writeTopicOffsetsSomewhere(topic: Tuple2[Int, StreamInputInfo]): Unit = {
  145.  
  146. // map offset info to OffsetRange objects
  147. val partitionOffsets = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]]
  148.  
  149. // for every partition's range of offsets
  150. partitionOffsets.map(offsetRange => {
  151.  
  152. // write the new starting offset for each partition in the topic to the state db
  153. var maxOffset = offsetRange.untilOffset - 1
  154.  
  155. // do something with `offsetRange.topic`
  156. // do something with `offsetRange.partition`
  157. // do something with `offsetRange.count`
  158. // do something with `maxOffset`
  159. })
  160. }
  161.  
  162. /**
  163. * Takes a topic object and writes the number of records for said topic this batch somewhere.
  164. *
  165. * @param topic A topic object within a Batch's StreamIdToInputInfo
  166. */
  167. def writeTopicCountSomewhere(topic: Tuple2[Int, StreamInputInfo]): Unit = {
  168.  
  169. // store the individual record count for this topic
  170. val numRecords = topic._2.numRecords
  171.  
  172. // store topicName
  173. val topicName = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]].head.topic
  174.  
  175. // write record count for this topic this batch
  176. // do something with `topicName` and `numRecords`
  177. }
  178. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement