Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def getExecutors = spark.sparkContext.getExecutorStorageStatus.toSeq.map(_.blockManagerId).collect {
- case bm if !bm.isDriver => bm
- }
- def reduceExecutors(totalNumber: Int): Unit = {
- //TODO throw error if totalNumber is more than current
- logger.info(s"""Attempting to reduce number of executors to $totalNumber""")
- spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
- val killedExecutors = scala.collection.mutable.ListBuffer[String]()
- while (getExecutors.size > totalNumber) {
- val executorIds = getExecutors.map(_.executorId).filterNot(killedExecutors.contains(_))
- val executorsToKill = Random.shuffle(executorIds).take(executorIds.size - totalNumber)
- spark.sparkContext.killExecutors(executorsToKill)
- killedExecutors ++= executorsToKill
- Thread.sleep(1000)
- }
- }
- def increaseExecutors(totalNumber: Int): Unit = {
- //TODO throw error if totalNumber is less than current
- logger.info(s"""Attempting to increase number of executors to $totalNumber""")
- spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
- while (getExecutors.size < totalNumber) {
- Thread.sleep(1000)
- }
- }
Add Comment
Please, Sign In to add comment