Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // This is a very long investigation of Scala Futures and Long Execution problem.
- // But I Promise[T], it will be interesting.
- // Ready?
- // Prerequisites: PlayFramework 2.6, Scala 2.12, Java 8
- // Problem: suppose we have an async DB query that fetches some data from DB.
- // Suppose it is not optimized, and it takes 50 hours to complete (just design flaw, shit happens)
- // What issues does it cause?
- // 1. no timeout for a calling service
- // 2. Future[T] seizes a thread from EC, and we can just run out of threads
- // OK, let's start
- // 1. Using Akka timeout from PlayFramework itself:
- // MyController.scala:
- import javax.inject.Inject
- import scala.concurrent.{ExecutionContext, Future}
- import play.api.mvc._
- import play.api.Logger
- class MyController @Inject()(cc: ControllerComponents)(implicit val ec: ExecutionContext) extends AbstractController(cc) {
- def getInt: Future[Int] = Future {
- Logger.info("Future started")
- Thread.sleep(60000)
- Logger.info("Future finished")
- 5
- }
- // add to routes: POST /test/long/action controllers.MyController.longAsyncAction
- def longAsyncAction: Action[AnyContent] = Action.async {
- getInt map { result =>
- Ok(s"Result is $result")
- }
- }
- }
- // now add these lines to application.conf:
- play.server.http.idleTimeout = 60s // optional
- play.server.akka.requestTimeout = 30s
- // run "sbt run"
- POST http://localhost:9000/test/long/action
- // output:
- // 200 (OK): Result is 5
- // stdout:
- // 2018-09-19 10:31:24.271 INFO play.api.Play Application started (Dev)
- // 2018-09-19 10:31:31.899 INFO application Future started
- // 2018-09-19 10:32:31.900 INFO application Future finished
- // Why has it happened? Because those settings work only for Prod mode.
- // OK, run "sbt dist", unpack zip archive form target/univarsal and launch the App in Prod mode.
- // let's try once again!
- POST http://localhost:9000/test/long/action
- // output:
- // Service unavailable (503): The server was not able to produce a timely response to your request.
- // Please try again in a short while!
- // stdout:
- // 2018-09-19 10:36:44.491 INFO play.core.server.AkkaHttpServer Listening for HTTP on /0:0:0:0:0:0:0:0:9000
- // 2018-09-19 10:37:59.167 INFO application Future started
- // 2018-09-19 10:38:59.168 INFO application Future finished
- // Hey! Timeout works (problem 1)! But as we can see the Future is NOT terminated (problem 2)
- // Summary: Akka Timeout does NOT terminate a Future after timeout.
- // 2. Let's check 'withTimeout' function (https://pastebin.com/eu3JK6Fp).
- // Now we can turn off Prod server and go back to Dev mode.
- import scala.concurrent.duration._
- import scala.language.postfixOps
- ...
- class MyController @Inject()(
- cc: ControllerComponents
- )(implicit val ec: ExecutionContext, as: ActorSystem) extends AbstractController(cc) {
- private implicit val scheduler = as.scheduler
- ...
- def longAsyncAction: Action[AnyContent] = Action.async {
- withTimeout(30 seconds) {
- getInt map { result =>
- Ok(s"Result is $result")
- }
- }
- }
- }
- // run:
- http://localhost:9000/test/long/action
- // output:
- // 500 (InternalServerError) [TimeoutException: Future timed out after 30 seconds]
- // stdout:
- // 2018-09-19 10:49:59.823 INFO play.api.Play Application started (Dev)
- // 2018-09-19 10:50:04.558 INFO application Future started
- // 2018-09-19 10:50:34.572 ERROR application
- // ! @79afo86ho - Internal server error, for (POST) [/test/long/action] ->
- // Caused by: java.util.concurrent.TimeoutException: Future timed out after 30 seconds
- // 2018-09-19 10:51:04.558 INFO application Future finished
- // Great result!
- // Now we have timeout (problem 1) and extra logger information, but the Future is still NOT terminated (problem 2)!
- // Summary: "Future.firstCompletedOf" does NOT terminate a Future after timeout.
- // 3. OK, we know that Await is not recommended, but we have no choice... let's try Await:
- import java.time.LocalDateTime
- import scala.concurrent.ExecutionContext.Implicits.global
- import scala.concurrent.{Await, Future}
- import scala.concurrent.duration._
- import scala.io.StdIn
- import scala.language.postfixOps
- object Main extends App {
- def print(x: Any): Unit = {Thread.sleep(50); println(s"${LocalDateTime.now()}: $x")}
- def f = Future {
- print("Future started")
- Thread.sleep(60000)
- print("Future finished")
- 5
- }
- try {
- val result = Await.result(f, 30 seconds)
- print(s"Result is $result")
- } catch {case e: Throwable => print(s"Error: $e")}
- print("Press ENTER to finish")
- StdIn.readLine()
- }
- // output:
- // 2018-09-19T14:08:48.051: Future1 started
- // 2018-09-19T14:09:18.131: Error: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
- // 2018-09-19T14:09:18.181: Press ENTER to finish
- // 2018-09-19T14:09:48.115: Future1 finished
- // As we can see, we detect timeout (problem 1), but Future is still not terminated (problem 2).
- // Summary: "Await" does NOT terminate a Future after timeout.
- // 4. Let's try "cancellable()" method. It returns Future + special function to cancel execution.
- // see https://stackoverflow.com/a/16010193/2212849 for more details
- import java.time.LocalDateTime
- import scala.concurrent.ExecutionContext.Implicits.global
- import scala.concurrent.{Future, Promise}
- import scala.io.StdIn
- import scala.language.postfixOps
- import scala.util.{Failure, Success}
- object Main extends App {
- def print(x: Any): Unit = {Thread.sleep(50); println(s"${LocalDateTime.now()}: $x")}
- def cancellable[T](f: Future[T])(customCode: => Unit): (() => Unit, Future[T]) = {
- val p = Promise[T]
- val first = Future firstCompletedOf Seq(p.future, f)
- val cancellation: () => Unit = { () =>
- first onFailure { case _ => customCode}
- p failure new Exception
- }
- (cancellation, first)
- }
- def f = Future {
- print("Future started")
- Thread.sleep(60000)
- print("Future finished")
- 5
- }
- // create Cancellable Future from ordinary Future!
- val (cancel, f1) = cancellable(f) {
- print("Cancelled")
- }
- f1.onComplete {
- case Success(result) => print(s"Result is $result")
- case Failure(error) => print(s"Sorry: $error")
- }
- print("Press ENTER to cancel Future")
- StdIn.readLine()
- cancel()
- print("Press ENTER to finish")
- StdIn.readLine()
- }
- // output:
- // 2018-09-19T14:18:48.300: Press ENTER to cancel Future
- // 2018-09-19T14:18:48.300: Future started
- <ENTER>
- // 2018-09-19T14:19:32.996: Press ENTER to finish
- // 2018-09-19T14:19:32.998: Sorry: java.lang.Exception
- // 2018-09-19T14:19:33.001: Cancelled
- // 2018-09-19T14:19:48.353: Future finished
- // So we've got the same result as in Example#2 (in fact they are the same, because both uses Future.firstCompletedOf)
- // Summary: "Future.firstCompletedOf" does NOT terminate a Future after timeout.
- // 5. One may say:
- // - Java rocks, Scala sucks
- // - Why?
- // - Because since Java 7, we have FutureTask<T> that has "cancel()" method!
- // OK, let's try it! We wrap Java FutureTask in Scala Promise container.
- // Taken from https://stackoverflow.com/a/39986418/2212849
- // Cancellable.scala:
- import java.util.concurrent.FutureTask
- import scala.concurrent._
- import scala.util.Try
- class Cancellable[T](executionContext: ExecutionContext, todo: => T) {
- private val promise = Promise[T]()
- def future: Future[T] = promise.future
- private val jf: FutureTask[T] = new FutureTask[T](() => todo) {
- override def done(): Unit = promise.complete(Try(get()))
- }
- def cancel(): Unit = jf.cancel(true)
- executionContext.execute(jf)
- }
- object Cancellable {
- def apply[T](todo: => T)(implicit ec: ExecutionContext): Cancellable[T] = new Cancellable[T](ec, todo)
- }
- // Main.scala:
- import java.time.LocalDateTime
- import scala.concurrent.ExecutionContext.Implicits.global
- import scala.io.StdIn
- import scala.language.postfixOps
- import scala.util.{Failure, Success}
- object Main extends App {
- def print(x: Any): Unit = {Thread.sleep(50); println(s"${LocalDateTime.now()}: $x")}
- def f = Cancellable {
- print("Future started")
- Thread.sleep(60000)
- print("Future finished")
- 5
- }
- f.future.onComplete {
- case Success(result) => print(s"Result is $result")
- case Failure(error) => print(s"Sorry: $error")
- }
- print("Press ENTER to cancel Future")
- StdIn.readLine()
- f.cancel()
- print("Press ENTER to finish")
- StdIn.readLine()
- }
- // output:
- // 2018-09-19T14:29:37.799: Future started
- // 2018-09-19T14:29:37.799: Press ENTER to cancel Future
- <ENTER>
- // 2018-09-19T14:30:09.067: Press ENTER to finish
- // 2018-09-19T14:30:37.852: Future finished
- // 2018-09-19T14:30:37.903: Result is 5
- // So... even Java Futures are still alive after cancellation! Moreover it eventually returned Success[T]!
- // Summary: "Java FutureTask" does NOT terminate after timeout.
- // 6. What's the root problem? Maybe it's just because underlying thread is still alive?
- // Maybe we should terminate it in order to stop Future for good?
- // Let's try this solution: https://pastebin.com/jrEuP5Am
- // Taken from: https://gist.github.com/viktorklang/5409467
- import java.time.LocalDateTime
- import scala.concurrent.ExecutionContext.Implicits.global
- import scala.io.StdIn
- import scala.language.postfixOps
- import scala.util.{Failure, Success}
- import Interrupt._
- object Main extends App {
- def print(x: Any): Unit = {Thread.sleep(50); println(s"${LocalDateTime.now()}: $x")}
- val (f, cancel) = interruptibly {
- print("Future started")
- Thread.sleep(60000)
- print("Future finished")
- 5
- }
- f.onComplete {
- case Success(result) => print(s"Result is $result")
- case Failure(error) => print(s"Sorry: $error")
- }
- print("Press ENTER to cancel Future")
- StdIn.readLine()
- cancel()
- print("Press ENTER to finish")
- StdIn.readLine()
- print("Bye")
- }
- // output:
- // 2018-09-19T14:42:47.521: Future started
- // 2018-09-19T14:42:47.523: Press ENTER to cancel Future
- <ENTER>
- // 2018-09-19T14:43:21.173: Press ENTER to finish
- // 2018-09-19T14:43:21.174: Sorry: java.util.concurrent.CancellationException
- // 2018-09-19T14:43:57.758: Bye
- // Wow! Look at the last timestamp! We have no "Future finished" messages! Now Future is finally terminated!
- // Summary: only interrupting underlying thread can terminate a Future.
- // CAVEAT! Please pay attention that terminating threads is highly discouraged in both Java and Scala!
- // Thread may possess some important resources and it must close them properly after usage.
- // Interrupting threads may cause bugs, resource leaks and other elusive issues.
- // Also IMHO terminating threads from an alive Thread Pool is a bad idea as well.
- // FINAL SUMMARY:
- // There are a lot of ways to solve problem 1: just use Akka timeout in application.conf, and you'll be good!
- // For problem 2 there is no "idiomatic" way to solve. We should understand that interrupting threads is a bad idea.
- // That's why the best solution is to follow better design and avoid possibly long operations.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement