Guest User

Untitled

a guest
Dec 16th, 2017
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.46 KB | None | 0 0
  1. package com.despegar.p13n.prometeo.eventbus
  2.  
  3. import akka.actor.ActorSystem
  4. import akka.stream.ActorMaterializer
  5. import akka.stream.OverflowStrategy
  6. import akka.stream.scaladsl.Keep
  7. import akka.stream.scaladsl.Sink
  8. import akka.stream.scaladsl.Source
  9.  
  10. object BufferExample extends App {
  11.  
  12. implicit val system = ActorSystem("buffer-example")
  13. implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
  14.  
  15. import scala.concurrent.duration._
  16.  
  17.  
  18. /* Bad: same number of emitted and consumed events, i.e. DOES NOT DROP
  19. Emitted: 1
  20. Emitted: 1
  21. Emitted: 1
  22. Consumed: 1
  23. Emitted: 1
  24. Emitted: 1
  25. Consumed: 1
  26. Consumed: 1
  27. Consumed: 1
  28. Consumed: 1
  29. */
  30. def example1() {
  31. val c = Source.tick(500 millis, 500 millis, 1)
  32. .map(x => {
  33. println("Emitted: " + x)
  34. x
  35. })
  36. .buffer(1, OverflowStrategy.dropBuffer).async
  37. .toMat(Sink.foreach[Int](x => {
  38. Thread.sleep(5000)
  39. println("Consumed: " + x)
  40. }))(Keep.left)
  41. .run
  42. Thread.sleep(3000)
  43. c.cancel()
  44.  
  45. }
  46.  
  47. /* Good: the buffer has dropped the first 4 elements, i.e. DROPS!
  48. Emitted: 1
  49. Emitted: 2
  50. Emitted: 3
  51. Emitted: 4
  52. Emitted: 5
  53. Consumed: 5
  54. */
  55. def example2() = {
  56. Source(1 to 5)
  57. .map(x => {
  58. println("Emitted: " + x)
  59. x
  60. })
  61. .buffer(1, OverflowStrategy.dropBuffer).async
  62. .to(Sink.foreach[Int](x => {
  63. Thread.sleep(1000)
  64. println("Consumed: " + x)
  65. })).run
  66. }
  67.  
  68.  
  69. example1()
  70. //example2()
  71. }
Add Comment
Please, Sign In to add comment