Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.streaming.dstream._
- import org.specs2.mutable._
- import org.apache.spark.streaming._
- import scala.collection.mutable
- class spark[T](val seq: Seq[String])(implicit val fun: DStream[String] => DStream[T]) extends After {
- lazy val ssc = new StreamingContext("local", "test", Seconds(1))
- val rdd = ssc.sparkContext.makeRDD(seq)
- val stream = new ConstantInputDStream(ssc, rdd)
- val collector = mutable.MutableList[T]()
- fun(stream).foreachRDD(rdd => collector ++= rdd.collect())
- ssc.start()
- ssc.awaitTerminationOrTimeout(1000)
- def after = ssc.stop()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement