Advertisement
Guest User

Untitled

a guest
Jul 22nd, 2019
106
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.90 KB | None | 0 0
  1. import akka.actor._
  2. import akka.stream._
  3. import akka.stream.scaladsl.Source
  4.  
  5. import scala.concurrent.duration._
  6. import scala.concurrent.{ExecutionContext, Future}
  7. import scala.util.{Failure, Success}
  8.  
  9. /*
  10. * Run using foldAsync to accumulate results
  11. */
  12. object TestStreamApp extends App {
  13. private implicit val executionContext = ExecutionContext.Implicits.global
  14. implicit val system = ActorSystem("TestSystem")
  15. implicit val materializer = ActorMaterializer()
  16.  
  17. def process(page: Int): Future[Int] =
  18. Future.successful(page)
  19.  
  20. val l: Stream[Int] = 1 #:: l.map(_ + 1)
  21.  
  22. val streamedTasks =
  23. Source(l)
  24. .takeWhile(_ <= 5)
  25. .throttle(1, 3 seconds)
  26. .runFoldAsync(0) { (acc, v) =>
  27. process(v).map(v => acc + 1)
  28. }
  29.  
  30.  
  31. streamedTasks.onComplete {
  32. case Success(amount) => println(s"Processed $amount values")
  33. case Failure(ex) => println("Failed to completed tasks", ex)
  34. }
  35.  
  36. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement