Advertisement
mitrakov

Terminating Futures

Sep 19th, 2018
538
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 11.51 KB | None | 0 0
  1. // This is a very long investigation of Scala Futures and Long Execution problem.
  2. // But I Promise[T], it will be interesting.
  3. // Ready?
  4.  
  5. // Prerequisites: PlayFramework 2.6, Scala 2.12, Java 8
  6.  
  7. // Problem: suppose we have an async DB query that fetches some data from DB.
  8. // Suppose it is not optimized, and it takes 50 hours to complete (just design flaw, shit happens)
  9. // What issues does it cause?
  10. // 1. no timeout for a calling service
  11. // 2. Future[T] seizes a thread from EC, and we can just run out of threads
  12.  
  13. // OK, let's start
  14.  
  15.  
  16.  
  17. // 1. Using Akka timeout from PlayFramework itself:
  18. // MyController.scala:
  19. import javax.inject.Inject
  20. import scala.concurrent.{ExecutionContext, Future}
  21. import play.api.mvc._
  22. import play.api.Logger
  23.  
  24. class MyController @Inject()(cc: ControllerComponents)(implicit val ec: ExecutionContext) extends AbstractController(cc) {
  25.  
  26.   def getInt: Future[Int] = Future {
  27.     Logger.info("Future started")
  28.     Thread.sleep(60000)
  29.     Logger.info("Future finished")
  30.     5
  31.   }
  32.  
  33.   // add to routes: POST    /test/long/action    controllers.MyController.longAsyncAction
  34.   def longAsyncAction: Action[AnyContent] = Action.async {
  35.     getInt map { result =>
  36.       Ok(s"Result is $result")
  37.     }
  38.   }
  39. }
  40.  
  41. // now add these lines to application.conf:
  42. play.server.http.idleTimeout = 60s      // optional
  43. play.server.akka.requestTimeout = 30s
  44.  
  45. // run "sbt run"
  46. POST http://localhost:9000/test/long/action
  47.  
  48.         // output:
  49.         // 200 (OK): Result is 5
  50.  
  51.         // stdout:
  52.         // 2018-09-19 10:31:24.271 INFO  play.api.Play    Application started (Dev)
  53.         // 2018-09-19 10:31:31.899 INFO  application      Future started
  54.         // 2018-09-19 10:32:31.900 INFO  application      Future finished
  55.  
  56. // Why has it happened? Because those settings work only for Prod mode.
  57. // OK, run "sbt dist", unpack zip archive form target/univarsal and launch the App in Prod mode.
  58. // let's try once again!
  59. POST http://localhost:9000/test/long/action
  60.  
  61.         // output:
  62.         // Service unavailable (503): The server was not able to produce a timely response to your request.
  63.         // Please try again in a short while!
  64.  
  65.         // stdout:
  66.         // 2018-09-19 10:36:44.491 INFO  play.core.server.AkkaHttpServer Listening for HTTP on /0:0:0:0:0:0:0:0:9000
  67.         // 2018-09-19 10:37:59.167 INFO  application      Future started
  68.         // 2018-09-19 10:38:59.168 INFO  application      Future finished
  69.  
  70. // Hey! Timeout works (problem 1)! But as we can see the Future is NOT terminated (problem 2)
  71. // Summary: Akka Timeout does NOT terminate a Future after timeout.
  72.  
  73.  
  74.  
  75. // 2. Let's check 'withTimeout' function (https://pastebin.com/eu3JK6Fp).
  76. // Now we can turn off Prod server and go back to Dev mode.
  77. import scala.concurrent.duration._
  78. import scala.language.postfixOps
  79. ...
  80.  
  81. class MyController @Inject()(
  82.     cc: ControllerComponents
  83.   )(implicit val ec: ExecutionContext, as: ActorSystem) extends AbstractController(cc) {
  84.  
  85.   private implicit val scheduler = as.scheduler
  86.   ...
  87.  
  88.   def longAsyncAction: Action[AnyContent] = Action.async {
  89.     withTimeout(30 seconds) {
  90.       getInt map { result =>
  91.         Ok(s"Result is $result")
  92.       }
  93.     }
  94.   }
  95. }
  96.  
  97. // run:
  98. http://localhost:9000/test/long/action
  99.  
  100.         // output:
  101.         // 500 (InternalServerError) [TimeoutException: Future timed out after 30 seconds]
  102.  
  103.         // stdout:
  104.         // 2018-09-19 10:49:59.823 INFO  play.api.Play    Application started (Dev)
  105.         // 2018-09-19 10:50:04.558 INFO  application      Future started
  106.         // 2018-09-19 10:50:34.572 ERROR application      
  107.         // ! @79afo86ho - Internal server error, for (POST) [/test/long/action] ->
  108.         // Caused by: java.util.concurrent.TimeoutException: Future timed out after 30 seconds
  109.         // 2018-09-19 10:51:04.558 INFO  application      Future finished
  110.  
  111. // Great result!
  112. // Now we have timeout (problem 1) and extra logger information, but the Future is still NOT terminated (problem 2)!
  113. // Summary: "Future.firstCompletedOf" does NOT terminate a Future after timeout.
  114.  
  115.  
  116.  
  117. // 3. OK, we know that Await is not recommended, but we have no choice... let's try Await:
  118. import java.time.LocalDateTime
  119. import scala.concurrent.ExecutionContext.Implicits.global
  120. import scala.concurrent.{Await, Future}
  121. import scala.concurrent.duration._
  122. import scala.io.StdIn
  123. import scala.language.postfixOps
  124.  
  125. object Main extends App {
  126.   def print(x: Any): Unit = {Thread.sleep(50); println(s"${LocalDateTime.now()}: $x")}
  127.  
  128.   def f = Future {
  129.     print("Future started")
  130.     Thread.sleep(60000)
  131.     print("Future finished")
  132.     5
  133.   }
  134.  
  135.   try {
  136.     val result = Await.result(f, 30 seconds)
  137.     print(s"Result is $result")
  138.   } catch {case e: Throwable => print(s"Error: $e")}
  139.  
  140.   print("Press ENTER to finish")
  141.   StdIn.readLine()
  142. }
  143.  
  144.         // output:
  145.         // 2018-09-19T14:08:48.051: Future1 started
  146.         // 2018-09-19T14:09:18.131: Error: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
  147.         // 2018-09-19T14:09:18.181: Press ENTER to finish
  148.         // 2018-09-19T14:09:48.115: Future1 finished
  149.  
  150. // As we can see, we detect timeout (problem 1), but Future is still not terminated (problem 2).
  151. // Summary: "Await" does NOT terminate a Future after timeout.
  152.  
  153.  
  154.  
  155. // 4. Let's try "cancellable()" method. It returns Future + special function to cancel execution.
  156. // see https://stackoverflow.com/a/16010193/2212849 for more details
  157.  
  158. import java.time.LocalDateTime
  159. import scala.concurrent.ExecutionContext.Implicits.global
  160. import scala.concurrent.{Future, Promise}
  161. import scala.io.StdIn
  162. import scala.language.postfixOps
  163. import scala.util.{Failure, Success}
  164.  
  165. object Main extends App {
  166.   def print(x: Any): Unit = {Thread.sleep(50); println(s"${LocalDateTime.now()}: $x")}
  167.  
  168.   def cancellable[T](f: Future[T])(customCode: => Unit): (() => Unit, Future[T]) = {
  169.     val p = Promise[T]
  170.     val first = Future firstCompletedOf Seq(p.future, f)
  171.     val cancellation: () => Unit = { () =>
  172.       first onFailure { case _ => customCode}
  173.       p failure new Exception
  174.     }
  175.     (cancellation, first)
  176.   }
  177.  
  178.   def f = Future {
  179.     print("Future started")
  180.     Thread.sleep(60000)
  181.     print("Future finished")
  182.     5
  183.   }
  184.  
  185.   // create Cancellable Future from ordinary Future!
  186.   val (cancel, f1) = cancellable(f) {
  187.     print("Cancelled")
  188.   }
  189.  
  190.   f1.onComplete {
  191.     case Success(result) => print(s"Result is $result")
  192.     case Failure(error) => print(s"Sorry: $error")
  193.   }
  194.  
  195.   print("Press ENTER to cancel Future")
  196.   StdIn.readLine()
  197.   cancel()
  198.  
  199.   print("Press ENTER to finish")
  200.   StdIn.readLine()
  201. }
  202.  
  203.         // output:
  204.         // 2018-09-19T14:18:48.300: Press ENTER to cancel Future
  205.         // 2018-09-19T14:18:48.300: Future started
  206.                                     <ENTER>
  207.         // 2018-09-19T14:19:32.996: Press ENTER to finish
  208.         // 2018-09-19T14:19:32.998: Sorry: java.lang.Exception
  209.         // 2018-09-19T14:19:33.001: Cancelled
  210.         // 2018-09-19T14:19:48.353: Future finished
  211.  
  212. // So we've got the same result as in Example#2 (in fact they are the same, because both uses Future.firstCompletedOf)
  213. // Summary: "Future.firstCompletedOf" does NOT terminate a Future after timeout.
  214.  
  215.  
  216.  
  217. // 5. One may say:
  218. // - Java rocks, Scala sucks
  219. // - Why?
  220. // - Because since Java 7, we have FutureTask<T> that has "cancel()" method!
  221.  
  222. // OK, let's try it! We wrap Java FutureTask in Scala Promise container.
  223. // Taken from https://stackoverflow.com/a/39986418/2212849
  224. // Cancellable.scala:
  225. import java.util.concurrent.FutureTask
  226. import scala.concurrent._
  227. import scala.util.Try
  228.  
  229. class Cancellable[T](executionContext: ExecutionContext, todo: => T) {
  230.   private val promise = Promise[T]()
  231.  
  232.   def future: Future[T] = promise.future
  233.  
  234.   private val jf: FutureTask[T] = new FutureTask[T](() => todo) {
  235.     override def done(): Unit = promise.complete(Try(get()))
  236.   }
  237.  
  238.   def cancel(): Unit = jf.cancel(true)
  239.  
  240.   executionContext.execute(jf)
  241. }
  242.  
  243. object Cancellable {
  244.   def apply[T](todo: => T)(implicit ec: ExecutionContext): Cancellable[T] = new Cancellable[T](ec, todo)
  245. }
  246.  
  247. // Main.scala:
  248. import java.time.LocalDateTime
  249. import scala.concurrent.ExecutionContext.Implicits.global
  250. import scala.io.StdIn
  251. import scala.language.postfixOps
  252. import scala.util.{Failure, Success}
  253.  
  254. object Main extends App {
  255.   def print(x: Any): Unit = {Thread.sleep(50); println(s"${LocalDateTime.now()}: $x")}
  256.  
  257.   def f = Cancellable {
  258.     print("Future started")
  259.     Thread.sleep(60000)
  260.     print("Future finished")
  261.     5
  262.   }
  263.  
  264.   f.future.onComplete {
  265.     case Success(result) => print(s"Result is $result")
  266.     case Failure(error) => print(s"Sorry: $error")
  267.   }
  268.  
  269.   print("Press ENTER to cancel Future")
  270.   StdIn.readLine()
  271.   f.cancel()
  272.  
  273.   print("Press ENTER to finish")
  274.   StdIn.readLine()
  275. }
  276.  
  277.         // output:
  278.         // 2018-09-19T14:29:37.799: Future started
  279.         // 2018-09-19T14:29:37.799: Press ENTER to cancel Future
  280.                                     <ENTER>
  281.         // 2018-09-19T14:30:09.067: Press ENTER to finish
  282.         // 2018-09-19T14:30:37.852: Future finished
  283.         // 2018-09-19T14:30:37.903: Result is 5
  284.  
  285. // So... even Java Futures are still alive after cancellation! Moreover it eventually returned Success[T]!
  286. // Summary: "Java FutureTask" does NOT terminate after timeout.
  287.  
  288.  
  289.  
  290. // 6. What's the root problem? Maybe it's just because underlying thread is still alive?
  291. // Maybe we should terminate it in order to stop Future for good?
  292. // Let's try this solution: https://pastebin.com/jrEuP5Am
  293. // Taken from: https://gist.github.com/viktorklang/5409467
  294.  
  295. import java.time.LocalDateTime
  296. import scala.concurrent.ExecutionContext.Implicits.global
  297. import scala.io.StdIn
  298. import scala.language.postfixOps
  299. import scala.util.{Failure, Success}
  300. import Interrupt._
  301.  
  302. object Main extends App {
  303.   def print(x: Any): Unit = {Thread.sleep(50); println(s"${LocalDateTime.now()}: $x")}
  304.  
  305.   val (f, cancel) = interruptibly {
  306.     print("Future started")
  307.     Thread.sleep(60000)
  308.     print("Future finished")
  309.     5
  310.   }
  311.  
  312.   f.onComplete {
  313.     case Success(result) => print(s"Result is $result")
  314.     case Failure(error) => print(s"Sorry: $error")
  315.   }
  316.  
  317.   print("Press ENTER to cancel Future")
  318.   StdIn.readLine()
  319.   cancel()
  320.  
  321.   print("Press ENTER to finish")
  322.   StdIn.readLine()
  323.   print("Bye")
  324. }
  325.  
  326.         // output:
  327.         // 2018-09-19T14:42:47.521: Future started
  328.         // 2018-09-19T14:42:47.523: Press ENTER to cancel Future
  329.                                     <ENTER>
  330.         // 2018-09-19T14:43:21.173: Press ENTER to finish
  331.         // 2018-09-19T14:43:21.174: Sorry: java.util.concurrent.CancellationException
  332.         // 2018-09-19T14:43:57.758: Bye
  333.  
  334. // Wow! Look at the last timestamp! We have no "Future finished" messages! Now Future is finally terminated!
  335. // Summary: only interrupting underlying thread can terminate a Future.
  336.  
  337. // CAVEAT! Please pay attention that terminating threads is highly discouraged in both Java and Scala!
  338. // Thread may possess some important resources and it must close them properly after usage.
  339. // Interrupting threads may cause bugs, resource leaks and other elusive issues.
  340. // Also IMHO terminating threads from an alive Thread Pool is a bad idea as well.
  341.  
  342. // FINAL SUMMARY:
  343. // There are a lot of ways to solve problem 1: just use Akka timeout in application.conf, and you'll be good!
  344. // For problem 2 there is no "idiomatic" way to solve. We should understand that interrupting threads is a bad idea.
  345. // That's why the best solution is to follow better design and avoid possibly long operations.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement