Guest User

Untitled

a guest
Jul 18th, 2018
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.13 KB | None | 0 0
  1. def getExecutors = spark.sparkContext.getExecutorStorageStatus.toSeq.map(_.blockManagerId).collect {
  2. case bm if !bm.isDriver => bm
  3. }
  4.  
  5. def reduceExecutors(totalNumber: Int): Unit = {
  6. //TODO throw error if totalNumber is more than current
  7. logger.info(s"""Attempting to reduce number of executors to $totalNumber""")
  8. spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
  9. val killedExecutors = scala.collection.mutable.ListBuffer[String]()
  10. while (getExecutors.size > totalNumber) {
  11. val executorIds = getExecutors.map(_.executorId).filterNot(killedExecutors.contains(_))
  12. val executorsToKill = Random.shuffle(executorIds).take(executorIds.size - totalNumber)
  13. spark.sparkContext.killExecutors(executorsToKill)
  14. killedExecutors ++= executorsToKill
  15. Thread.sleep(1000)
  16. }
  17. }
  18.  
  19. def increaseExecutors(totalNumber: Int): Unit = {
  20. //TODO throw error if totalNumber is less than current
  21. logger.info(s"""Attempting to increase number of executors to $totalNumber""")
  22. spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
  23. while (getExecutors.size < totalNumber) {
  24. Thread.sleep(1000)
  25. }
  26. }
Add Comment
Please, Sign In to add comment