Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import daos.exceptions.TabChordDBThrottleException
- import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, ThreadPoolExecutor}
- import scala.concurrent._
- import play.api.db.slick
- import play.core.NamedThreadFactory
- /**
- * Implement this trait to get a throttled database session Wrapper
- */
- trait ThrottledDBService {
- /** Override to use a different Application */
- protected def app = slick.Config.app
- /** Override to use different Databasename*/
- protected def dataBaseName = slick.Config.defaultName
- protected object DBConfiguration {
- private def buildThreadPoolExecutionContext(minConnections: Int, maxConnections: Int) = {
- val threadPoolExecutor = new ThreadPoolExecutor(minConnections, maxConnections,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue[Runnable](),
- new NamedThreadFactory("tabchord.db.execution.context"))
- ExecutionContext.fromExecutorService(threadPoolExecutor) -> threadPoolExecutor
- }
- val partitionCount = app.configuration.getInt(s"db.$dataBaseName.partitionCount").getOrElse(2)
- val maxConnections = app.configuration.getInt(s"db.$dataBaseName.maxConnectionsPerPartition").getOrElse(5)
- val minConnections = app.configuration.getInt(s"db.$dataBaseName.minConnectionsPerPartition").getOrElse(5)
- val maxQueriesPerRequest = app.configuration.getInt(s"db.$dataBaseName.maxQueriesPerRequest").getOrElse(20)
- val (executionContext, threadPool) = buildThreadPoolExecutionContext(minConnections, maxConnections)
- }
- /** A predicate for checking our ability to service database requests is determined by ensuring that the request
- queue doesn't fill up beyond a certain threshold. For convenience we use the max number of connections * the max
- # of db requests per web request to determine this threshold. It is a rough check as we don't know how many
- queries we're going to make or what other threads are running in parallel etc. Nevertheless, the check is
- adequate in order to throttle the acceptance of requests to the size of the pool.
- @see https://github.com/TechEmpower/FrameworkBenchmarks/pull/124
- */
- protected def isDBAvailable: Boolean = {
- val dbc = DBConfiguration
- dbc.threadPool.getQueue.size() < (dbc.maxConnections * dbc.maxQueriesPerRequest)
- }
- /**
- * Wraps the block with a Future in the appropriate database execution context and slick session,
- * throttling the # of request in accordance to the rules specified in the db config and.
- *
- * Terminates the future with a @TabChordDBThrottleException in case of queue overload
- * @param body user code
- * @return Future of the database computation
- */
- protected def throttled[A](body: slick.Session => A):Future[A] = {
- future{
- if(isDBAvailable){
- slick.DB(dataBaseName)(app).withSession {
- s =>
- body(s)
- }
- } else {
- throw new TabChordDBThrottleException("Too many Requests pending in Threadpool")
- }
- }(DBConfiguration.executionContext)
- }
- /**
- * Wraps the block with a Future in the appropriate database execution context and slick transaction,
- * throttling the # of request in accordance to the rules specified in the db config and.
- *
- * Terminates the future with a @TabChordDBThrottleException in case of queue overload
- * @param body user code
- * @return Future of the database computation
- */
- protected def throttledTransaction[A](body: slick.Session => A): Future[A] = {
- throttled {
- s => s.withTransaction {
- body(s)
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement