Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import absolute_import
- from __future__ import division
- from builtins import map
- from builtins import str
- from builtins import range
- from past.utils import old_div
- import logging
- import os
- from pipes import quote
- import subprocess
- import time
- import math
- import tempfile
- import json
- # Python 3 compatibility imports
- from six.moves.queue import Empty, Queue
- from six import iteritems
- from toil.batchSystems import MemoryString
- from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem
- from toil.lib.bioio import getTempFile
- logger = logging.getLogger(__name__)
- class HydraGridEngineBatchSystem(AbstractGridEngineBatchSystem):
- class Worker(AbstractGridEngineBatchSystem.Worker):
- """
- Grid Engine-specific AbstractGridEngineWorker methods
- """
- def getRunningJobIDs(self):
- times = {}
- currentjobs = {}
- currentjobs = dict((str(self.batchJobIDs[x][0]), x) for x in self.runningJobs)
- process = subprocess.Popen(["qstat"], stdout=subprocess.PIPE)
- stdout, stderr = process.communicate()
- for currline in stdout.split('\n'):
- items = currline.strip().split()
- if items:
- if items[0] in currentjobs:
- jobstart = " ".join(items[5:7]).split('.')[0] # strip sub-seconds
- jobstart = time.mktime(time.strptime(jobstart, "%Y-%m-%d %H:%M:%S"))
- times[currentjobs[items[0]]] = time.time() - jobstart
- return times
- def killJob(self, jobID):
- pass # need to implement
- def prepareSubmission(self, cpu, memory, jobID, command):
- return self.prepareQsub(cpu, memory, jobID) + [command]
- def submitJob(self, subLine):
- tmp_file = getTempFile()
- with open(tmp_file, 'w') as outf:
- for l in subLine:
- outf.write(l + '\n')
- with open(tmp_file) as tmp_handle:
- process = subprocess.Popen(['qsub'], stdout=subprocess.PIPE, stdin=tmp_handle)
- result, err = process.communicate()
- result = result.split()[2]
- os.remove(tmp_file)
- time.sleep(0.1)
- return result
- def getJobExitCode(self, sgeJobID):
- # check if hydra succeeded and failed paths exist
- succeeded_path = os.path.join(os.getenv('HYDRA_ROOT'), 'archive', 'succeeded', sgeJobID)
- failed_path = os.path.join(os.getenv('HYDRA_ROOT'), 'archive', 'failed', sgeJobID)
- logger.debug('Checking on job {}'.format(sgeJobID))
- # if neither exist, job is still running
- if not os.path.exists(succeeded_path) and not os.path.exists(failed_path):
- logger.debug('Job {} still running'.format(sgeJobID))
- return None
- if os.path.exists(succeeded_path):
- logger.debug('Job {} completed successfully'.format(sgeJobID))
- return 0
- complete_file = os.path.join(failed_path, 'complete')
- if not os.path.exists(complete_file):
- logger.debug('Job {} missing complete file'.format(sgeJobID))
- return 42 # lost host, exit value is 42
- results = json.load(open(complete_file))
- if 'exit_code' in results:
- exit_code = results['exit_code']
- logger.debug('Job {} has exit code {}'.format(sgeJobID, exit_code))
- else:
- exit_code = 1
- logger.debug('Job {} has no exit code'.format(sgeJobID))
- return exit_code
- """
- Implementation-specific helper methods
- """
- def prepareQsub(self, cpu, mem, jobID):
- # hydra qsub uses a text file
- if mem is not None:
- memStr = str(math.ceil(mem / 1024 ** 3)) + 'G'
- else:
- # default to 2gb
- memStr = '2G'
- if cpu is not None and math.ceil(cpu) > 1:
- cpuStr = str(int(math.ceil(cpu)))
- else:
- cpuStr = '1'
- qsubline = ['#!/usr/bin/env bash',
- '#$ -N "toil_job_{}"'.format(jobID),
- '#$ -V',
- '#$ -pe threads {}'.format(cpuStr),
- '#$ -l mem_free={}'.format(memStr),
- '#$ -cwd',
- '#$ -o /dev/null',
- '#$ -e /dev/null',
- '#$ -S "/usr/bin/env bash"']
- if self.boss.environment:
- qsubline.append('#$ -v')
- qsubline.append(','.join(k + '=' + quote(os.environ[k] if v is None else v)
- for k, v in self.boss.environment.items()))
- return qsubline
- """
- The interface for SGE aka Sun GridEngine.
- """
- @classmethod
- def getWaitDuration(cls):
- return 0.0
- @classmethod
- def obtainSystemConstants(cls):
- # for hydra, we hard code to just below a m4.16xlarge -- 32 cores, 230gb
- return 32, MemoryString('230G')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement