Advertisement
Guest User

Untitled

a guest
Jul 1st, 2016
104
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.12 KB | None | 0 0
  1. java.lang.NullPointerException
  2. at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
  3. at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
  4. at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
  5. at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
  6. at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
  7. at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
  8. at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
  9. at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
  10. at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
  11. at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
  12. at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
  13. at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
  14. at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
  15. at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  16.  
  17. import org.apache.spark.SparkConf
  18. import org.apache.spark.rdd.RDD
  19. import org.apache.spark.streaming.Seconds
  20. import org.apache.spark.streaming.StreamingContext
  21. import org.apache.spark.streaming.flume.FlumeUtils
  22. import org.apache.spark.streaming.flume.SparkFlumeEvent
  23.  
  24. class SystemUnderTest(ssc : StreamingContext, conf : SparkConf) {
  25.  
  26. def start = {
  27. startFlumeStream
  28. startSparkStreaming
  29. }
  30.  
  31.  
  32. private def startFlumeStream = {
  33. // How do I test this function ?
  34. val flumeStream = FlumeUtils.createStream(ssc, "localhost", 2121)
  35. val reducedStream = flumeStream.reduceByWindow((key, value) => key, Seconds(30), Seconds(10))
  36. reducedStream.foreachRDD(rdd => simpleTransformFunction(rdd))
  37.  
  38. }
  39.  
  40. private def simpleTransformFunction(rdd : RDD[SparkFlumeEvent]) = {
  41. // This function is called every 10 seconds - but I can't seem to pass any
  42. // RDDs to it
  43. println("simpleTransformFunction called!")
  44.  
  45.  
  46. rdd.map(event => {
  47. val headers = event.event.getHeaders()
  48. println(headers.get("type"))
  49. println(headers.get("value"))
  50. })
  51.  
  52. }
  53.  
  54. private def startSparkStreaming = {
  55. ssc.start
  56. }
  57.  
  58. }
  59.  
  60. import scala.collection.mutable.Queue
  61.  
  62. import org.apache.spark._
  63. import org.apache.spark.rdd.RDD
  64. import org.apache.spark.SparkConf
  65. import org.apache.spark.streaming._
  66. import org.apache.spark.streaming.flume.SparkFlumeEvent
  67. import org.apache.spark.streaming.StreamingContext._
  68.  
  69. import java.util.Map
  70. import java.util.HashMap
  71.  
  72. object Example {
  73.  
  74. def main(args:Array[String]): Unit = {
  75. // Set the required values in the config
  76. val appName = "AppName"
  77. val master = "local[*]"
  78. val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
  79. val ssc = new StreamingContext(sparkConf, Seconds(5))
  80.  
  81. // Create a queue that will be populated with test data
  82. val rddQueue = new Queue[RDD[SparkFlumeEvent]]
  83.  
  84. rdd += scc.sparkContext.makeRDD(createSparkFlumeEvents)
  85. // put the queue into the context (events added to the queue should show up in the context?)
  86. // this seems to cause NPEs
  87. ssc.queueStream(rddQueue)
  88.  
  89. // Create and start the system under test
  90. val systemUnderTest = new SystemUnderTest(ssc, sparkConf)
  91. systemUnderTest.start
  92.  
  93. }
  94.  
  95. // Create a Sequence of valid SparkFlumeEvents
  96. private def createSparkFlumeEvents : Seq[SparkFlumeEvent] = {
  97. val events = new Queue[SparkFlumeEvent]
  98. for (a <- 1 to 10) {
  99. events += createSparkFlumeEvent(a)
  100. }
  101. events.toSeq
  102. }
  103.  
  104. private def createSparkFlumeEvent(value :Int) = {
  105. val sparkFlumeEvent = new SparkFlumeEvent
  106. val map = new HashMap[CharSequence, CharSequence]
  107. map.put("type", "install")
  108. map.put("value", value.toString)
  109. sparkFlumeEvent.event.setHeaders(map)
  110.  
  111. sparkFlumeEvent
  112. }
  113. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement