Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.despegar.p13n.prometeo.eventbus
- import akka.actor.ActorSystem
- import akka.stream.ActorMaterializer
- import akka.stream.OverflowStrategy
- import akka.stream.scaladsl.Keep
- import akka.stream.scaladsl.Sink
- import akka.stream.scaladsl.Source
- object BufferExample extends App {
- implicit val system = ActorSystem("buffer-example")
- implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
- import scala.concurrent.duration._
- /* Bad: same number of emitted and consumed events, i.e. DOES NOT DROP
- Emitted: 1
- Emitted: 1
- Emitted: 1
- Consumed: 1
- Emitted: 1
- Emitted: 1
- Consumed: 1
- Consumed: 1
- Consumed: 1
- Consumed: 1
- */
- def example1() {
- val c = Source.tick(500 millis, 500 millis, 1)
- .map(x => {
- println("Emitted: " + x)
- x
- })
- .buffer(1, OverflowStrategy.dropBuffer).async
- .toMat(Sink.foreach[Int](x => {
- Thread.sleep(5000)
- println("Consumed: " + x)
- }))(Keep.left)
- .run
- Thread.sleep(3000)
- c.cancel()
- }
- /* Good: the buffer has dropped the first 4 elements, i.e. DROPS!
- Emitted: 1
- Emitted: 2
- Emitted: 3
- Emitted: 4
- Emitted: 5
- Consumed: 5
- */
- def example2() = {
- Source(1 to 5)
- .map(x => {
- println("Emitted: " + x)
- x
- })
- .buffer(1, OverflowStrategy.dropBuffer).async
- .to(Sink.foreach[Int](x => {
- Thread.sleep(1000)
- println("Consumed: " + x)
- })).run
- }
- example1()
- //example2()
- }
Add Comment
Please, Sign In to add comment