Advertisement
Guest User

ThrottledDBService

a guest
Apr 19th, 2014
160
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.57 KB | None | 0 0
  1. import daos.exceptions.TabChordDBThrottleException
  2. import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, ThreadPoolExecutor}
  3. import scala.concurrent._
  4. import play.api.db.slick
  5. import play.core.NamedThreadFactory
  6.  
  7. /**
  8.  * Implement this trait to get a throttled database session Wrapper
  9.  */
  10. trait ThrottledDBService {
  11.   /** Override to use a different Application */
  12.   protected def app = slick.Config.app
  13.  
  14.   /** Override to use different Databasename*/
  15.   protected def dataBaseName = slick.Config.defaultName
  16.  
  17.   protected object DBConfiguration {
  18.     private def buildThreadPoolExecutionContext(minConnections: Int, maxConnections: Int) = {
  19.       val threadPoolExecutor = new ThreadPoolExecutor(minConnections, maxConnections,
  20.         0L, TimeUnit.MILLISECONDS,
  21.         new LinkedBlockingQueue[Runnable](),
  22.         new NamedThreadFactory("tabchord.db.execution.context"))
  23.       ExecutionContext.fromExecutorService(threadPoolExecutor) -> threadPoolExecutor
  24.     }
  25.  
  26.     val partitionCount = app.configuration.getInt(s"db.$dataBaseName.partitionCount").getOrElse(2)
  27.     val maxConnections = app.configuration.getInt(s"db.$dataBaseName.maxConnectionsPerPartition").getOrElse(5)
  28.     val minConnections = app.configuration.getInt(s"db.$dataBaseName.minConnectionsPerPartition").getOrElse(5)
  29.     val maxQueriesPerRequest = app.configuration.getInt(s"db.$dataBaseName.maxQueriesPerRequest").getOrElse(20)
  30.     val (executionContext, threadPool) = buildThreadPoolExecutionContext(minConnections, maxConnections)
  31.   }
  32.  
  33.   /** A predicate for checking our ability to service database requests is determined by ensuring that the request
  34.       queue doesn't fill up beyond a certain threshold. For convenience we use the max number of connections * the max
  35.       # of db requests per web request to determine this threshold. It is a rough check as we don't know how many
  36.       queries we're going to make or what other threads are running in parallel etc. Nevertheless, the check is
  37.       adequate in order to throttle the acceptance of requests to the size of the pool.
  38.       @see https://github.com/TechEmpower/FrameworkBenchmarks/pull/124
  39.     */
  40.   protected def isDBAvailable: Boolean = {
  41.     val dbc = DBConfiguration
  42.     dbc.threadPool.getQueue.size() < (dbc.maxConnections * dbc.maxQueriesPerRequest)
  43.   }
  44.  
  45.   /**
  46.    * Wraps the block with a Future in the appropriate database execution context and slick session,
  47.    * throttling the # of request in accordance to the rules specified in the db config and.
  48.    *
  49.    * Terminates the future with a @TabChordDBThrottleException in case of queue overload
  50.    * @param body user code
  51.    * @return Future of the database computation
  52.    */
  53.   protected def throttled[A](body: slick.Session => A):Future[A] = {
  54.     future{
  55.       if(isDBAvailable){
  56.         slick.DB(dataBaseName)(app).withSession {
  57.           s =>
  58.             body(s)
  59.         }
  60.       } else {
  61.         throw new TabChordDBThrottleException("Too many Requests pending in Threadpool")
  62.       }
  63.     }(DBConfiguration.executionContext)
  64.   }
  65.  
  66.   /**
  67.    * Wraps the block with a Future in the appropriate database execution context and slick transaction,
  68.    * throttling the # of request in accordance to the rules specified in the db config and.
  69.    *
  70.    * Terminates the future with a @TabChordDBThrottleException in case of queue overload
  71.    * @param body user code
  72.    * @return Future of the database computation
  73.    */
  74.   protected def throttledTransaction[A](body: slick.Session => A): Future[A] = {
  75.     throttled {
  76.       s => s.withTransaction {
  77.         body(s)
  78.       }
  79.     }
  80.   }
  81. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement