Advertisement
Guest User

Untitled

a guest
Jan 18th, 2018
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.23 KB | None | 0 0
  1. from __future__ import absolute_import
  2. from __future__ import division
  3. from builtins import map
  4. from builtins import str
  5. from builtins import range
  6. from past.utils import old_div
  7. import logging
  8. import os
  9. from pipes import quote
  10. import subprocess
  11. import time
  12. import math
  13. import tempfile
  14. import json
  15.  
  16. # Python 3 compatibility imports
  17. from six.moves.queue import Empty, Queue
  18. from six import iteritems
  19.  
  20. from toil.batchSystems import MemoryString
  21. from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem
  22.  
  23. from toil.lib.bioio import getTempFile
  24.  
  25. logger = logging.getLogger(__name__)
  26.  
  27. class HydraGridEngineBatchSystem(AbstractGridEngineBatchSystem):
  28.  
  29.     class Worker(AbstractGridEngineBatchSystem.Worker):
  30.  
  31.         """
  32.        Grid Engine-specific AbstractGridEngineWorker methods
  33.        """
  34.         def getRunningJobIDs(self):
  35.             times = {}
  36.             currentjobs = {}
  37.             currentjobs = dict((str(self.batchJobIDs[x][0]), x) for x in self.runningJobs)
  38.             process = subprocess.Popen(["qstat"], stdout=subprocess.PIPE)
  39.             stdout, stderr = process.communicate()
  40.  
  41.             for currline in stdout.split('\n'):
  42.                 items = currline.strip().split()
  43.                 if items:
  44.                     if items[0] in currentjobs:
  45.                         jobstart = " ".join(items[5:7]).split('.')[0]  # strip sub-seconds
  46.                         jobstart = time.mktime(time.strptime(jobstart, "%Y-%m-%d %H:%M:%S"))
  47.                         times[currentjobs[items[0]]] = time.time() - jobstart
  48.  
  49.             return times
  50.  
  51.         def killJob(self, jobID):
  52.             pass  # need to implement
  53.  
  54.         def prepareSubmission(self, cpu, memory, jobID, command):
  55.             return self.prepareQsub(cpu, memory, jobID) + [command]
  56.  
  57.         def submitJob(self, subLine):
  58.             tmp_file = getTempFile()
  59.             with open(tmp_file, 'w') as outf:
  60.                 for l in subLine:
  61.                     outf.write(l + '\n')
  62.             with open(tmp_file) as tmp_handle:
  63.                 process = subprocess.Popen(['qsub'], stdout=subprocess.PIPE, stdin=tmp_handle)
  64.                 result, err = process.communicate()
  65.             result = result.split()[2]
  66.             os.remove(tmp_file)
  67.             time.sleep(0.1)
  68.             return result
  69.  
  70.         def getJobExitCode(self, sgeJobID):
  71.             # check if hydra succeeded and failed paths exist
  72.             succeeded_path = os.path.join(os.getenv('HYDRA_ROOT'), 'archive', 'succeeded', sgeJobID)
  73.             failed_path = os.path.join(os.getenv('HYDRA_ROOT'), 'archive', 'failed', sgeJobID)
  74.  
  75.             logger.debug('Checking on job {}'.format(sgeJobID))
  76.  
  77.             # if neither exist, job is still running
  78.             if not os.path.exists(succeeded_path) and not os.path.exists(failed_path):
  79.                 logger.debug('Job {} still running'.format(sgeJobID))
  80.                 return None
  81.  
  82.             if os.path.exists(succeeded_path):
  83.                 logger.debug('Job {} completed successfully'.format(sgeJobID))
  84.                 return 0
  85.  
  86.             complete_file = os.path.join(failed_path, 'complete')
  87.             if not os.path.exists(complete_file):
  88.                 logger.debug('Job {} missing complete file'.format(sgeJobID))
  89.                 return 42  # lost host, exit value is 42
  90.  
  91.             results = json.load(open(complete_file))
  92.             if 'exit_code' in results:
  93.                 exit_code = results['exit_code']
  94.                 logger.debug('Job {} has exit code {}'.format(sgeJobID, exit_code))
  95.             else:
  96.                 exit_code = 1
  97.                 logger.debug('Job {} has no exit code'.format(sgeJobID))
  98.             return exit_code
  99.  
  100.         """
  101.        Implementation-specific helper methods
  102.        """
  103.         def prepareQsub(self, cpu, mem, jobID):
  104.             # hydra qsub uses a text file
  105.  
  106.             if mem is not None:
  107.                 memStr = str(math.ceil(mem / 1024 ** 3)) + 'G'
  108.             else:
  109.                 # default to 2gb
  110.                 memStr = '2G'
  111.  
  112.             if cpu is not None and math.ceil(cpu) > 1:
  113.                 cpuStr = str(int(math.ceil(cpu)))
  114.             else:
  115.                 cpuStr = '1'
  116.  
  117.             qsubline = ['#!/usr/bin/env bash',
  118.                         '#$ -N "toil_job_{}"'.format(jobID),
  119.                         '#$ -V',
  120.                         '#$ -pe threads {}'.format(cpuStr),
  121.                         '#$ -l mem_free={}'.format(memStr),
  122.                         '#$ -cwd',
  123.                         '#$ -o /dev/null',
  124.                         '#$ -e /dev/null',
  125.                         '#$ -S "/usr/bin/env bash"']
  126.  
  127.             if self.boss.environment:
  128.                 qsubline.append('#$ -v')
  129.                 qsubline.append(','.join(k + '=' + quote(os.environ[k] if v is None else v)
  130.                                          for k, v in self.boss.environment.items()))
  131.             return qsubline
  132.  
  133.     """
  134.    The interface for SGE aka Sun GridEngine.
  135.    """
  136.  
  137.     @classmethod
  138.     def getWaitDuration(cls):
  139.         return 0.0
  140.  
  141.     @classmethod
  142.     def obtainSystemConstants(cls):
  143.         # for hydra, we hard code to just below a m4.16xlarge -- 32 cores, 230gb
  144.         return 32, MemoryString('230G')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement