Advertisement
Guest User

Untitled

a guest
Oct 11th, 2017
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.50 KB | None | 0 0
  1. #!/usr/bin/python
  2.  
  3. ## ********************************************************************************
  4. ## mr-usage-by-user.py
  5. ##
  6. ## Aggregates YARN MapReduce usage by day and user and writes the results to the console and to a file
  7. ##
  8. ## As the CM-API call "yarn.get_yarn_applications" can only return 1000 jobs max per call the script will make
  9. ## multiple calls to yarn.get_yarn_applications and aggregate all results between the script's global start and end times
  10. ##
  11. ## The time window batch size for the start and end times in the call to yarn.get_yarn_applications is
  12. ## set in the variable batch_time_interval and has a default value of 1 hour
  13. ## The value should be set to an interval within which fewer than 1000 apps are run
  14. ##
  15. ## Dependencies: Requires the modules: pytz and tzlocal
  16. ## Those modules need to be installed on the machine running the script using commands like:
  17. ## $ sudo pip install pytz
  18. ## $ sudo pip install tzlocal
  19. ##
  20. ## Usage: ./mr-usage-by-user.py [<END_DATE> [<NUM_DAYS>]]
  21. ##
  22. ## Args: END_DATE (optional) - Sets the end date for the YARN history to be reported on. Defaults to the current day
  23. ## Date should be formatted as YYYY-mm-dd
  24. ## NUM_DAYS (optional) - Set to the number of days of YARN usage to report on. Defaults to 7 days
  25. ##
  26. ## Examples:
  27. ## Report on 7 days activity ending today:
  28. ## ./mr-usage-by-user.py
  29. ##
  30. ## Report on 7 days activity ending 2016-04-01:
  31. ## ./mr-usage-by-user.py 2016-04-01
  32. ##
  33. ## Report on 3 days activity ending 2016-04-01:
  34. ## ./mr-usage-by-user.py 2016-04-01 3
  35. ##
  36. ## Edit the settings below to connect to your Cluster
  37. ##
  38. ## ********************************************************************************
  39.  
  40. import sys
  41. from datetime import time, datetime, timedelta
  42. from sets import Set
  43.  
  44. import pytz
  45. import tzlocal
  46.  
  47. from cm_api.api_client import ApiResource
  48.  
  49. ## Settings to connect to the cluster
  50. cm_host = "<YOUR CM HOST>"
  51. cm_port = "7180"
  52. cm_login = "admin"
  53. cm_password = "admin"
  54. cluster_name = "<YOUR CLUSTER NAME>"
  55.  
  56.  
  57. ## I'll hardcode a filename for the report to be written to
  58. filename = "mr-usage-by-user-" + str(datetime.today().date()) + ".csv"
  59.  
  60.  
  61. ## Needed for python < v2.7
  62. def total_seconds(td):
  63. return long((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)) / 10**6
  64.  
  65.  
  66. ## Set the appropriate timezone. I will use the local timezone
  67. local_timezone = tzlocal.get_localzone()
  68.  
  69.  
  70. ## Set to True for verbose output
  71. DEBUG = False
  72.  
  73.  
  74. ## Check Command Line Args
  75. if len(sys.argv) > 3:
  76. print "Error: Wrong number of arguments"
  77. print "Usage: ./mr-usage-by-user.py [<END_DATE> [<NUM_DAYS>]]"
  78. print " Args: END_DATE (optional) - Sets the end date for the YARN history to be reported on. Defaults to the current day"
  79. print " Date should be formatted as YYYY-mm-dd"
  80. print " NUM_DAYS (optional) - Set to the number of days of YARN usage to report on. Defaults to 7 days"
  81. print ""
  82. print "Examples:"
  83. print " Report on 7 days activity ending today:"
  84. print " ./mr-usage-by-user.py"
  85. print ""
  86. print " Report on 7 days activity ending 2016-04-01:"
  87. print " ./mr-usage-by-user.py 2016-04-01"
  88. print ""
  89. print " Report on 3 days activity ending 2016-04-01:"
  90. print " ./mr-usage-by-user.py 2016-04-01 3"
  91. print "\n\n"
  92. quit(1)
  93.  
  94.  
  95. ## end_date
  96. end_date = None
  97. if len(sys.argv)> 1:
  98. end_date = datetime.strptime(sys.argv[1], '%Y-%m-%d')
  99. if end_date is None:
  100. end_date = datetime.today().date()
  101.  
  102.  
  103. ## num_days
  104. num_days = None
  105. if len(sys.argv)> 2:
  106. num_days = int(sys.argv[2])
  107. if num_days is None:
  108. num_days = 7
  109.  
  110.  
  111. ## Set the start and end times
  112. start_time = datetime.combine(end_date, datetime.min.time()) - timedelta(days = num_days)
  113. end_time = datetime.combine(end_date, datetime.max.time())
  114. if DEBUG:
  115. print "\n\nDEBUG start_time = " + str(start_time)
  116. print "DEBUG end_time = " + str(end_time) + "\n\n"
  117.  
  118.  
  119. ## Connect to CM
  120. api = ApiResource(server_host=cm_host, server_port=cm_port, username=cm_login, password=cm_password)
  121.  
  122.  
  123. ## Get the Cluster
  124. cluster = None
  125. clusters = api.get_all_clusters()
  126. for c in clusters:
  127. if c.displayName == cluster_name:
  128. cluster = c
  129. break
  130. if cluster is None:
  131. print "\nError: Cluster '" + cluster_name + "' not found"
  132. quit(1)
  133.  
  134. print "\n\nConnected to Cloudera Manager on " + cm_host + ":" + cm_port
  135.  
  136.  
  137. ## Get YARN Service
  138. yarn = None
  139. service_list = cluster.get_all_services()
  140. for service in service_list:
  141. if service.type == "YARN":
  142. yarn = service
  143. break
  144. if yarn is None:
  145. print "Error: Could not locate YARN Service"
  146. quit(1)
  147.  
  148.  
  149. print "\nGetting YARN History for Cluster \'" + cluster_name + "\' from " + str(start_time.date()) + " to " + str(end_time.date())
  150.  
  151.  
  152. ## Create a dictionary to hold all jobs
  153. jobs = {}
  154.  
  155. ## Define a time window for each call to yarn.get_yarn_applications
  156. ## The interval should be set so that fewer than 1000 jobs execute within the time window
  157. ## I'll hardcode it here for 1 hour
  158. batch_time_interval = timedelta(minutes = 60 * 1)
  159.  
  160. ## We'll keep track of each app we see to avoid dupes
  161. apps_processed = set()
  162.  
  163. batch_end_time = start_time
  164.  
  165. while batch_end_time < end_time:
  166.  
  167. ## set the start and end time for each batch
  168. start_time = batch_end_time
  169. batch_end_time = batch_end_time + batch_time_interval
  170. if batch_end_time > end_time:
  171. batch_end_time = end_time
  172.  
  173. ## We'll keep track of the number of successful apps we count per batch
  174. number_of_successful_apps_per_batch = 0
  175.  
  176. ## Get YARN Applications
  177. response = yarn.get_yarn_applications(start_time, batch_end_time, filter_str='', limit=1000, offset=0)
  178.  
  179. ## For each job that has a state of "SUCCEEDED", add the job to the dictionary of jobs per day per user
  180. for app in response.applications:
  181.  
  182. if app.state == "SUCCEEDED":
  183.  
  184. ## check to see if this app has already been processed
  185. if app.applicationId in apps_processed:
  186. break;
  187.  
  188. ## This is the first time we've seen this app; add it to the processed set
  189. apps_processed.add(app.applicationId)
  190. number_of_successful_apps_per_batch = number_of_successful_apps_per_batch + 1
  191.  
  192. user = app.user
  193. appId = app.applicationId
  194. appDate = app.startTime.replace(tzinfo=pytz.utc).astimezone(local_timezone).date()
  195.  
  196. ## create a new dictionary of jobs per day
  197. if not jobs.has_key(appDate):
  198. jobs[appDate] = {}
  199.  
  200. ## create a new dictionary of jobs per user
  201. if not jobs[appDate].has_key(user):
  202. jobs[appDate][user] = {}
  203.  
  204. ## create a new dictionary of job attributes for each job
  205. if not jobs[appDate][user].has_key(appId):
  206. jobs[appDate][user][appId] = {}
  207.  
  208. ## Add the job's attributes to each day's user's job's dictionary
  209. jobs[appDate][user][appId]["name"] = app.name
  210. jobs[appDate][user][appId]["pool"] = app.pool
  211. jobs[appDate][user][appId]["startTime"] = app.startTime
  212. jobs[appDate][user][appId]["endTime"] = app.endTime
  213. jobs[appDate][user][appId]["application_duration"] = total_seconds(app.endTime - app.startTime)
  214. jobs[appDate][user][appId]["cpu_milliseconds"] = long(app.attributes["cpu_milliseconds"])
  215. jobs[appDate][user][appId]["physical_memory_bytes"] = long(app.attributes["physical_memory_bytes"])
  216.  
  217. if DEBUG:
  218. print "\n\n-- DEBUG --------------"
  219. print "adding job to job list for " + str(appDate) + " " + user + " " + appId
  220. print "name: " + app.name
  221. print "pool: " + app.pool
  222. print "startTime: " + str(app.startTime)
  223. print "endTime: " + str(app.endTime)
  224. print "duration: " + str(total_seconds(app.endTime - app.startTime))
  225. print "cpu: " + str(app.attributes["cpu_milliseconds"])
  226. print "memory: " + str(app.attributes["physical_memory_bytes"])
  227.  
  228.  
  229. if number_of_successful_apps_per_batch > 0:
  230. print "Retrieved " + str(number_of_successful_apps_per_batch) + " successfully completed apps between " + str(start_time) + " and " + str(batch_end_time)
  231.  
  232. print "\n\n"
  233. print "Aggregated results by day and user"
  234.  
  235. print "\n\n"
  236.  
  237. report_file = open(filename, 'w')
  238. report_file.write("Date,User,#Jobs,Duration(secs),CPU(secs),Memory(MB)\n")
  239.  
  240. print "Date User #Jobs Duration(secs) CPU(secs) Memory(MB)"
  241. print "--------------------------------------------------------------------------------------"
  242.  
  243.  
  244. dates = sorted(jobs.keys())
  245. for the_date in dates:
  246. users = sorted(jobs[the_date].keys())
  247. for the_user in users:
  248. num_jobs = len(jobs[the_date][the_user])
  249.  
  250. duration = 0
  251. cpu = 0
  252. memory = 0
  253.  
  254. for the_job in jobs[the_date][the_user].keys():
  255.  
  256. ## aggregate the Duration
  257. duration = duration + jobs[the_date][the_user][the_job]["application_duration"]
  258.  
  259. ## aggregate the CPU
  260. cpu = cpu + jobs[the_date][the_user][the_job]["cpu_milliseconds"]
  261.  
  262. ## aggregate the Memory
  263. memory = memory + jobs[the_date][the_user][the_job]["physical_memory_bytes"]
  264.  
  265. dateStr = str(the_date)
  266. numJobsStr = ("%0.0f" % num_jobs)
  267. durationStr = ("%0.0f" % (duration)) # round to nearest second
  268. cpuStr = ("%0.0f" % (cpu / 1000)) # round to nearest second
  269. memoryStr = ("%0.0f" % (memory / (1024 * 1024))) # round to MB
  270.  
  271. report_file.write(dateStr + "," + the_user + "," + numJobsStr + "," + durationStr + "," + cpuStr + "," + memoryStr + "\n")
  272.  
  273. print dateStr + "\t" + the_user + "\t" + numJobsStr.rjust(10) + "\t" + durationStr.rjust(10) + "\t" + cpuStr.rjust(10) + "\t" + memoryStr.rjust(10)
  274.  
  275. print "\n\n"
  276.  
  277. report_file.close()
  278. print "Report output saved to file: " + filename
  279.  
  280. print "\n\n"
  281.  
  282. print "Done\n\n\n"
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement