Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import akka.actor._
- import akka.stream._
- import akka.stream.scaladsl.Source
- import scala.concurrent.duration._
- import scala.concurrent.{ExecutionContext, Future}
- import scala.util.{Failure, Success}
- /*
- * Run using foldAsync to accumulate results
- */
- object TestStreamApp extends App {
- private implicit val executionContext = ExecutionContext.Implicits.global
- implicit val system = ActorSystem("TestSystem")
- implicit val materializer = ActorMaterializer()
- def process(page: Int): Future[Int] =
- Future.successful(page)
- val l: Stream[Int] = 1 #:: l.map(_ + 1)
- val streamedTasks =
- Source(l)
- .takeWhile(_ <= 5)
- .throttle(1, 3 seconds)
- .runFoldAsync(0) { (acc, v) =>
- process(v).map(v => acc + 1)
- }
- streamedTasks.onComplete {
- case Success(amount) => println(s"Processed $amount values")
- case Failure(ex) => println("Failed to completed tasks", ex)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement