Advertisement
Guest User

Untitled

a guest
Jul 21st, 2017
175
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.56 KB | None | 0 0
  1. #!/usr/bin/env python
  2.  
  3.  
  4. """
  5. hadd output of crab jobs utilising the fact that the output files are here at DESY,
  6. so we can skip the painful crab getoutput
  7. """
  8.  
  9.  
  10. import os
  11. import subprocess
  12. from collections import namedtuple
  13. from itertools import chain
  14. import re
  15.  
  16. from retrieve_jobs import TaskDictionaryNameUnordered
  17. from submit_jobs import MCBackgroundsSampleDictionaryUnordered, SignalMCSampleDictionaryUnordered, DataDictionaryMuonChannelUnordered, DataDictionaryElectronChannelUnordered
  18.  
  19.  
  20. # Location of crab output files
  21. NAF_DIR = "/pnfs/desy.de/cms/tier2/store/user/raggleto/"
  22.  
  23. # Where you want the hadded files to go
  24. OUTPUT_DIR = "/nfs/dust/cms/user/aggleton/aTGCsAnalysis/Samples_80X_Working_21_7_17_NewBtagSFNewVtagSF"
  25.  
  26. FEATURE_NAME = "NewBtagSFNewVtagSF"
  27.  
  28. # whether to run hadd jobs on bird cluster instead
  29. RUN_ON_BIRD = True
  30.  
  31. # minimum % of jobs that must have completed successfully to generate a hadded file
  32. MIN_PERCENT_FINISHED = 100.
  33.  
  34. Sample = namedtuple("Sample", ["taskname", "dataset", "outname"])
  35.  
  36.  
  37. class bcolors:
  38. BLUE = '\033[94m'
  39. GREEN = '\033[92m'
  40. YELLOW = '\033[93m'
  41. RED = '\033[91m'
  42. ENDC = '\033[0m'
  43.  
  44.  
  45. Status = namedtuple("Status", ["colour", "name"])
  46. class JobStatus:
  47. FINISHED = Status(colour=bcolors.GREEN, name="Finished")
  48. UNFINISHED = Status(colour=bcolors.RED, name="Still running")
  49. JOBSUBMITTED = Status(colour=bcolors.BLUE, name="Hadd job submitted")
  50.  
  51.  
  52. def create_sample_listing():
  53. """Go through submission lists and retrival list, and match based on task name to form one larger listing"""
  54. entries = []
  55. for task_name, dataset in chain(MCBackgroundsSampleDictionaryUnordered, DataDictionaryMuonChannelUnordered):
  56. matching_output = [x for x in TaskDictionaryNameUnordered if x[0] == task_name]
  57. if len(matching_output) > 1:
  58. raise RuntimeError("More than 1 match for %s" % task_name)
  59. if len(matching_output) == 0:
  60. print "No match for task %s" % task_name
  61. continue
  62. output_name = matching_output[0][1]
  63. entries.append(Sample(task_name, dataset, output_name))
  64.  
  65. return entries
  66.  
  67.  
  68. def get_job_status(job_dir):
  69. print "Checking job status of", job_dir
  70. cmd = "crab status -d crab_projects/%s" % job_dir
  71. status_text = subprocess.check_output(cmd, shell=True)
  72. return {
  73. "finished": float(re.search(r"finished +([0-9.]+)", status_text).groups(1)[0]) if "finished" in status_text else 0.0,
  74. "task_name": re.search(r"Task name:[ \t]+([0-9a-zA-Z_:-]+)", status_text).group(1)
  75. }
  76.  
  77. if channel not in ["mu", "ele"]:
  78. raise RuntimeError("channel arg must be mu or ele")
  79.  
  80. status_dict = get_job_status(crab_dir)
  81. print status_dict
  82.  
  83.  
  84. def do_one_task(entry, channel, min_percent_finished=90.):
  85. """Do checking & hadding for one crab task.
  86.  
  87. If output file already exists, skip this task.
  88.  
  89. channel : {"mu", "ele"}
  90. min_percent_finished : float
  91. Minimum % of jobs that must be finished to perform hadd job
  92. """
  93. if channel not in ["mu", "ele"]:
  94. raise RuntimeError("channel arg must be mu or ele")
  95.  
  96. output_file = os.path.join(OUTPUT_DIR, entry.outname + "_%s.root" % channel)
  97.  
  98. if os.path.isfile(output_file):
  99. print bcolors.YELLOW + "! Output file already exists - skipping this task" + bcolors.ENDC
  100. return JobStatus.FINISHED
  101.  
  102. crab_dir = "crab_%s_%s_%s" % (entry.taskname, channel, FEATURE_NAME)
  103.  
  104. status_dict = get_job_status(crab_dir)
  105. print status_dict
  106. if status_dict['finished'] < min_percent_finished:
  107. print bcolors.RED + "crab jobs not finished - skipping" + bcolors.ENDC
  108. return JobStatus.UNFINISHED
  109.  
  110. sample_dir = entry.dataset.split("/")[1]
  111. date_str = status_dict['task_name'].split(":")[0]
  112. input_str = os.path.join(NAF_DIR, sample_dir, crab_dir, date_str, "0000", "tree_%s_*.root" % channel)
  113.  
  114. # actually do the hadding
  115. if RUN_ON_BIRD:
  116. qsub_command = """qsub -N %s -v OUTPUTF="%s",INPUTF="%s" qsub_hadd.sh""" % (entry.taskname, output_file, input_str)
  117. # print qsub_command # Uncomment this line when testing to view the qsub command
  118. subprocess.check_call(qsub_command, shell=True)
  119. print bcolors.GREEN + "hadd job submitted" + bcolors.ENDC
  120. return JobStatus.JOBSUBMITTED
  121. else:
  122. hadd_cmd = "hadd %s %s" % (output_file, input_str)
  123. print hadd_cmd
  124. subprocess.check_output(hadd_cmd, shell=True) # need shell=True for wildcard expansion?
  125. return JobStatus.FINISHED
  126.  
  127.  
  128.  
  129. if __name__ == "__main__":
  130. samples = create_sample_listing()
  131. for x in samples:
  132. print x
  133.  
  134. if not os.path.isdir(OUTPUT_DIR):
  135. os.makedirs(OUTPUT_DIR)
  136.  
  137. results = []
  138. for ind, entry in enumerate(samples[7:9], 1):
  139. print bcolors.BLUE + ">>> [%d/%d]" % (ind, len(samples)), entry.taskname + bcolors.ENDC
  140. result = do_one_task(entry, "mu", MIN_PERCENT_FINISHED)
  141. results.append((entry.taskname, result))
  142.  
  143. # Print nice summary
  144. print ""
  145. print "="*80
  146. print "SUMMARY:"
  147. print "-"*80
  148. for (name, res) in results:
  149. print res.colour + name + " (" + res.name + ")" + bcolors.ENDC
  150. print "-"*80
  151. print JobStatus.JOBSUBMITTED.colour + str(sum([res == JobStatus.JOBSUBMITTED for (name, res) in results])) + " jobs submitted" + bcolors.ENDC
  152. print JobStatus.FINISHED.colour + str(sum([res == JobStatus.FINISHED for (name, res) in results])) + " / " + str(len(results)) + " jobs finished" + bcolors.ENDC
  153. print JobStatus.UNFINISHED.colour + str(sum([res == JobStatus.UNFINISHED for (name, res) in results])) + " / " + str(len(results)) + " jobs still running" + bcolors.ENDC
  154. print "="*80
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement