Advertisement
Guest User

Untitled

a guest
Jun 24th, 2017
58
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.03 KB | None | 0 0
  1. package my.stream
  2.  
  3. import java.text.NumberFormat
  4. import java.util.Locale
  5.  
  6. import akka.actor.ActorSystem
  7. import akka.stream.scaladsl.{Keep, Sink}
  8. import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
  9. import akka.stream.testkit.TestSubscriber
  10. import akka.stream.testkit.scaladsl.TestSource
  11. import akka.stream.{Attributes, _}
  12.  
  13. import scala.concurrent.Await
  14. import scala.concurrent.duration._
  15.  
  16. class MyTimer[T] extends GraphStage[FlowShape[T, T]] {
  17. val in = Inlet[T]("MyTimer.in")
  18. val out = Outlet[T]("MyTimer.out")
  19. override val shape = FlowShape(in, out)
  20.  
  21. override def initialAttributes: Attributes = Attributes.name("MyTimer")
  22.  
  23. override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
  24. new GraphStageLogic(shape) with InHandler with OutHandler {
  25. var currentTimestamp: Long = 0
  26. var howManyElementsProcessed: Int = 0
  27.  
  28. override def preStart(): Unit = {
  29. currentTimestamp = System.nanoTime()
  30. println("initialized!")
  31. }
  32.  
  33. override def onPush(): Unit = {
  34. val previousTimestamp = currentTimestamp
  35. currentTimestamp = System.nanoTime()
  36. val passed = NumberFormat.getNumberInstance(Locale.US).format(currentTimestamp - previousTimestamp)
  37. val elem = grab(in)
  38. push(out, elem)
  39.  
  40. if(howManyElementsProcessed == 0)
  41. println(s"processed element = ${elem}")
  42. else
  43. println(s"processed element = ${elem}, ${passed} nanoseconds has passed from the previous element")
  44.  
  45. howManyElementsProcessed = howManyElementsProcessed + 1
  46. }
  47.  
  48. override def onPull(): Unit = pull(in)
  49.  
  50. setHandlers(in, out, this)
  51. }
  52. }
  53.  
  54. object MyThrottle {
  55. implicit val system = ActorSystem()
  56. implicit val materializer = ActorMaterializer()
  57.  
  58. /**
  59. * This constructs and runs a stream where the Sink requests 10 elements and then the Source sends 10 too.
  60. * (i.e.) NO backpressure from downstream, but there is backpressure from the throttle stage
  61. */
  62. def throttleTest(elements: Int, per: FiniteDuration, maxBurst: Int, costPerElement: Int)(): Unit ={
  63. println("--------------testing a throttle GraphStage in a stream with the following parameters: -------------")
  64.  
  65. val nanoBetweenTokens = NumberFormat.getNumberInstance(Locale.US).format(per.toNanos / elements)
  66. println(s"elements = ${elements}, per = ${per}, maxBurst = ${maxBurst}, costCalc = ${costPerElement}, nanosBetweenTokens = $nanoBetweenTokens")
  67.  
  68. val ((sourcePublisher, fut), sinkPublisher) = TestSource.probe[Int]
  69. .throttle(elements, per, maxBurst, (_: Int) => costPerElement, ThrottleMode.Shaping)
  70. .via(new MyTimer)
  71. .watchTermination()(Keep.both)
  72. .toMat(Sink.asPublisher(false))(Keep.both)
  73. .run()
  74.  
  75. val sinkSubscriber = TestSubscriber.manualProbe[Int]()
  76. sinkPublisher.subscribe(sinkSubscriber)
  77. val sinkSubscription = sinkSubscriber.expectSubscription()
  78.  
  79. /**
  80. * Request 10 elements from downstream, where the upstream sends 10 elements
  81. * (i.e.) NO backpressure from downstream
  82. */
  83. sinkSubscription.request(10)
  84. for(i <- 1 to 10)
  85. sourcePublisher.sendNext(i)
  86. sourcePublisher.sendComplete()
  87.  
  88. try{
  89. val result = Await.result(fut, 10 seconds)
  90. println(s"The stream finished with result = ${result}")
  91. }
  92. catch{
  93. case e: Exception => println(s"The stream failed with ${e}")
  94. }
  95. }
  96.  
  97. /**
  98. * This constructs and runs a stream where the Sink requests only 5 elements but the Source sends 10.
  99. * (i.e.) There is backpressure from downstream, as well as backpressure from the throttle stage
  100. */
  101. def throttleTestWithDownstreamBackPressure(elements: Int, per: FiniteDuration, maxBurst: Int, costPerElement: Int)(): Unit ={
  102. println("--------------testing a throttle GraphStage in a stream with the following parameters and backpressure from downstream : -------------")
  103.  
  104. val nanoBetweenTokens = NumberFormat.getNumberInstance(Locale.US).format(per.toNanos / elements)
  105. println(s"elements = ${elements}, per = ${per}, maxBurst = ${maxBurst}, costCalc = ${costPerElement}, nanosBetweenTokens = $nanoBetweenTokens")
  106.  
  107. val ((sourcePublisher, fut), sinkPublisher) = TestSource.probe[Int]
  108. .throttle(elements, per, maxBurst, (_: Int) => costPerElement, ThrottleMode.Shaping)
  109. .via(new MyTimer)
  110. .watchTermination()(Keep.both)
  111. .toMat(Sink.asPublisher(false))(Keep.both)
  112. .run()
  113.  
  114. val sinkSubscriber = TestSubscriber.manualProbe[Int]()
  115. sinkPublisher.subscribe(sinkSubscriber)
  116. val sinkSubscription = sinkSubscriber.expectSubscription()
  117.  
  118. /**
  119. * Request only 5 elements from downstream, where the upstream sends 10 elements
  120. * (i.e.) From the 6-th (next to 5) element, there is backpressure from downstream
  121. */
  122. sinkSubscription.request(5) //************ THIS IS THE DIFFERENCE FROM throttleTest *************
  123. for(i <- 1 to 10)
  124. sourcePublisher.sendNext(i)
  125. sourcePublisher.sendComplete()
  126.  
  127. /**
  128. * This Await will fail with timeout exception, since the Future (fut) is Future[Done]
  129. * which completes when the stream completes, but this stream does not complete as
  130. * the last 5 elements are still pending due to backpressure from downstream
  131. */
  132. try{
  133. val result = Await.result(fut, 5 seconds)
  134. println(s"The stream finished with result = ${result}")
  135. }
  136. catch{
  137. case e: Exception => println(s"The stream failed with ${e}")
  138. }
  139. }
  140.  
  141. def main(args: Array[String]): Unit = {
  142. try {
  143. throttleTest(elements = 8, per = 2 seconds, maxBurst = 1, costPerElement = 1)
  144. throttleTest(elements = 8, per = 2 seconds, maxBurst = 3, costPerElement = 1)
  145. throttleTest(elements = 8, per = 2 seconds, maxBurst = 5, costPerElement = 1)
  146.  
  147. throttleTestWithDownstreamBackPressure(elements = 8, per = 2 seconds, maxBurst = 1, costPerElement = 1)
  148. throttleTestWithDownstreamBackPressure(elements = 8, per = 2 seconds, maxBurst = 3, costPerElement = 1)
  149. throttleTestWithDownstreamBackPressure(elements = 8, per = 2 seconds, maxBurst = 5, costPerElement = 1)
  150. }
  151. finally {
  152. system.terminate()
  153. }
  154. }
  155. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement