Advertisement
Guest User

Untitled

a guest
Dec 6th, 2016
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.98 KB | None | 0 0
  1. import scala.concurrent.ExecutionContext.Implicits.global
  2. import scala.concurrent.Future
  3.  
  4. trait Cache[K, V] {
  5. def put(key: K, value: V): Future[Unit]
  6. def get(key: K): Future[Option[V]]
  7. def remove(key: K): Future[Unit]
  8.  
  9. def getOrElse(key: K)(ifNotFound: => V): Future[V] = {
  10. apply(key) {
  11. Future.successful(ifNotFound)
  12. }
  13. }
  14.  
  15. def apply(key: K)(ifNotFoundFuture: => Future[V]): Future[V] = {
  16. get(key).flatMap { found =>
  17. found.map { x =>
  18. Future.successful(x)
  19. } getOrElse {
  20. ifNotFoundFuture.flatMap { value =>
  21. put(key, value).map { _ =>
  22. value
  23. }
  24. }
  25. }
  26. }
  27. }
  28. }
  29.  
  30. import akka.NotUsed
  31. import akka.stream.scaladsl.{Flow, Source}
  32.  
  33. object CacheFlow {
  34. def apply[A, B](
  35. flow: Flow[A, B, NotUsed],
  36. cache: Cache[A, B]
  37. ): Flow[A, B, NotUsed] = {
  38. Flow[A]
  39. .flatMapConcat { a =>
  40. Source.fromFuture(cache.get(a))
  41. .map { x =>
  42. (a, x)
  43. }
  44. }
  45. .flatMapConcat { case (a, found) =>
  46. found match {
  47. case Some(x) => Source.single(x)
  48. case None => Source.single(a).via(flow).flatMapConcat { x =>
  49. Source.fromFuture(cache.put(a, x)).map { _ => x }
  50. }
  51. }
  52. }
  53. }
  54. }
  55.  
  56. import akka.actor.ActorSystem
  57. import akka.stream.ActorMaterializer
  58. import akka.stream.scaladsl.{Flow, Sink, Source}
  59. import org.scalatest.FlatSpec
  60. import org.scalatest.Matchers._
  61. import org.scalatest.concurrent.ScalaFutures.whenReady
  62. import cats.syntax.option._
  63. import scala.concurrent.Future
  64.  
  65. class CacheFlowSpec extends FlatSpec {
  66. implicit val actorSystem = ActorSystem()
  67. implicit val actorMaterializer = ActorMaterializer()
  68.  
  69. val input = List("one", "two", "two", "three", "three", "three")
  70.  
  71. val alwaysSuccessfulCache = new Cache[Foo, Bar] {
  72. override def put(key: Foo, value: Bar): Future[Unit] = Future.successful(())
  73. override def get(key: Foo): Future[Option[Bar]] = Future.successful(Bar(key.value).some)
  74. override def remove(key: Foo): Future[Unit] = ???
  75. }
  76.  
  77. val alwaysMissingCache = new Cache[Foo, Bar] {
  78. override def put(key: Foo, value: Bar): Future[Unit] = Future.successful(())
  79. override def get(key: Foo): Future[Option[Bar]] = Future.successful(None)
  80. override def remove(key: Foo): Future[Unit] = ???
  81. }
  82.  
  83. private def runTest(cache: Cache[Foo, Bar], expectedFlowInvocations: Int) = {
  84. var flowInvocations: Int = 0
  85. val flow = Flow[Foo]
  86. .map { x =>
  87. flowInvocations = flowInvocations + 1
  88. Bar(x.value)
  89. }
  90.  
  91. val cacheFlow = CacheFlow(flow, cache)
  92. val output = Source(input)
  93. .map(Foo)
  94. .via(cacheFlow)
  95. .map(_.value)
  96. .runWith(Sink.seq)
  97.  
  98. whenReady(output) { x =>
  99. x.toList shouldEqual input
  100. flowInvocations shouldEqual expectedFlowInvocations
  101. }
  102. }
  103.  
  104. "CacheFlow" should "only use cache if it has data" in {
  105. runTest(alwaysSuccessfulCache, 0)
  106. }
  107.  
  108. it should "always use flow if cache is empty" in {
  109. runTest(alwaysMissingCache, input.size)
  110. }
  111. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement