Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- java.lang.NullPointerException
- at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
- at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
- at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
- at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
- at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
- at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
- at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
- at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
- at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
- at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
- at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
- at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
- at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
- at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
- import org.apache.spark.SparkConf
- import org.apache.spark.rdd.RDD
- import org.apache.spark.streaming.Seconds
- import org.apache.spark.streaming.StreamingContext
- import org.apache.spark.streaming.flume.FlumeUtils
- import org.apache.spark.streaming.flume.SparkFlumeEvent
- class SystemUnderTest(ssc : StreamingContext, conf : SparkConf) {
- def start = {
- startFlumeStream
- startSparkStreaming
- }
- private def startFlumeStream = {
- // How do I test this function ?
- val flumeStream = FlumeUtils.createStream(ssc, "localhost", 2121)
- val reducedStream = flumeStream.reduceByWindow((key, value) => key, Seconds(30), Seconds(10))
- reducedStream.foreachRDD(rdd => simpleTransformFunction(rdd))
- }
- private def simpleTransformFunction(rdd : RDD[SparkFlumeEvent]) = {
- // This function is called every 10 seconds - but I can't seem to pass any
- // RDDs to it
- println("simpleTransformFunction called!")
- rdd.map(event => {
- val headers = event.event.getHeaders()
- println(headers.get("type"))
- println(headers.get("value"))
- })
- }
- private def startSparkStreaming = {
- ssc.start
- }
- }
- import scala.collection.mutable.Queue
- import org.apache.spark._
- import org.apache.spark.rdd.RDD
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.flume.SparkFlumeEvent
- import org.apache.spark.streaming.StreamingContext._
- import java.util.Map
- import java.util.HashMap
- object Example {
- def main(args:Array[String]): Unit = {
- // Set the required values in the config
- val appName = "AppName"
- val master = "local[*]"
- val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
- val ssc = new StreamingContext(sparkConf, Seconds(5))
- // Create a queue that will be populated with test data
- val rddQueue = new Queue[RDD[SparkFlumeEvent]]
- rdd += scc.sparkContext.makeRDD(createSparkFlumeEvents)
- // put the queue into the context (events added to the queue should show up in the context?)
- // this seems to cause NPEs
- ssc.queueStream(rddQueue)
- // Create and start the system under test
- val systemUnderTest = new SystemUnderTest(ssc, sparkConf)
- systemUnderTest.start
- }
- // Create a Sequence of valid SparkFlumeEvents
- private def createSparkFlumeEvents : Seq[SparkFlumeEvent] = {
- val events = new Queue[SparkFlumeEvent]
- for (a <- 1 to 10) {
- events += createSparkFlumeEvent(a)
- }
- events.toSeq
- }
- private def createSparkFlumeEvent(value :Int) = {
- val sparkFlumeEvent = new SparkFlumeEvent
- val map = new HashMap[CharSequence, CharSequence]
- map.put("type", "install")
- map.put("value", value.toString)
- sparkFlumeEvent.event.setHeaders(map)
- sparkFlumeEvent
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement