Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import scala.concurrent.ExecutionContext.Implicits.global
- import scala.concurrent.Future
- trait Cache[K, V] {
- def put(key: K, value: V): Future[Unit]
- def get(key: K): Future[Option[V]]
- def remove(key: K): Future[Unit]
- def getOrElse(key: K)(ifNotFound: => V): Future[V] = {
- apply(key) {
- Future.successful(ifNotFound)
- }
- }
- def apply(key: K)(ifNotFoundFuture: => Future[V]): Future[V] = {
- get(key).flatMap { found =>
- found.map { x =>
- Future.successful(x)
- } getOrElse {
- ifNotFoundFuture.flatMap { value =>
- put(key, value).map { _ =>
- value
- }
- }
- }
- }
- }
- }
- import akka.NotUsed
- import akka.stream.scaladsl.{Flow, Source}
- object CacheFlow {
- def apply[A, B](
- flow: Flow[A, B, NotUsed],
- cache: Cache[A, B]
- ): Flow[A, B, NotUsed] = {
- Flow[A]
- .flatMapConcat { a =>
- Source.fromFuture(cache.get(a))
- .map { x =>
- (a, x)
- }
- }
- .flatMapConcat { case (a, found) =>
- found match {
- case Some(x) => Source.single(x)
- case None => Source.single(a).via(flow).flatMapConcat { x =>
- Source.fromFuture(cache.put(a, x)).map { _ => x }
- }
- }
- }
- }
- }
- import akka.actor.ActorSystem
- import akka.stream.ActorMaterializer
- import akka.stream.scaladsl.{Flow, Sink, Source}
- import org.scalatest.FlatSpec
- import org.scalatest.Matchers._
- import org.scalatest.concurrent.ScalaFutures.whenReady
- import cats.syntax.option._
- import scala.concurrent.Future
- class CacheFlowSpec extends FlatSpec {
- implicit val actorSystem = ActorSystem()
- implicit val actorMaterializer = ActorMaterializer()
- val input = List("one", "two", "two", "three", "three", "three")
- val alwaysSuccessfulCache = new Cache[Foo, Bar] {
- override def put(key: Foo, value: Bar): Future[Unit] = Future.successful(())
- override def get(key: Foo): Future[Option[Bar]] = Future.successful(Bar(key.value).some)
- override def remove(key: Foo): Future[Unit] = ???
- }
- val alwaysMissingCache = new Cache[Foo, Bar] {
- override def put(key: Foo, value: Bar): Future[Unit] = Future.successful(())
- override def get(key: Foo): Future[Option[Bar]] = Future.successful(None)
- override def remove(key: Foo): Future[Unit] = ???
- }
- private def runTest(cache: Cache[Foo, Bar], expectedFlowInvocations: Int) = {
- var flowInvocations: Int = 0
- val flow = Flow[Foo]
- .map { x =>
- flowInvocations = flowInvocations + 1
- Bar(x.value)
- }
- val cacheFlow = CacheFlow(flow, cache)
- val output = Source(input)
- .map(Foo)
- .via(cacheFlow)
- .map(_.value)
- .runWith(Sink.seq)
- whenReady(output) { x =>
- x.toList shouldEqual input
- flowInvocations shouldEqual expectedFlowInvocations
- }
- }
- "CacheFlow" should "only use cache if it has data" in {
- runTest(alwaysSuccessfulCache, 0)
- }
- it should "always use flow if cache is empty" in {
- runTest(alwaysMissingCache, input.size)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement