Advertisement
Guest User

Untitled

a guest
Jul 3rd, 2015
166
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.58 KB | None | 0 0
  1. import org.apache.spark.streaming.dstream._
  2. import org.specs2.mutable._
  3. import org.apache.spark.streaming._
  4. import scala.collection.mutable
  5.  
  6. class spark[T](val seq: Seq[String])(implicit val fun: DStream[String] => DStream[T]) extends After {
  7. lazy val ssc = new StreamingContext("local", "test", Seconds(1))
  8. val rdd = ssc.sparkContext.makeRDD(seq)
  9.  
  10. val stream = new ConstantInputDStream(ssc, rdd)
  11.  
  12. val collector = mutable.MutableList[T]()
  13.  
  14. fun(stream).foreachRDD(rdd => collector ++= rdd.collect())
  15.  
  16. ssc.start()
  17. ssc.awaitTerminationOrTimeout(1000)
  18.  
  19. def after = ssc.stop()
  20. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement