Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python
- ## ********************************************************************************
- ## mr-usage-by-user.py
- ##
- ## Aggregates YARN MapReduce usage by day and user and writes the results to the console and to a file
- ##
- ## As the CM-API call "yarn.get_yarn_applications" can only return 1000 jobs max per call the script will make
- ## multiple calls to yarn.get_yarn_applications and aggregate all results between the script's global start and end times
- ##
- ## The time window batch size for the start and end times in the call to yarn.get_yarn_applications is
- ## set in the variable batch_time_interval and has a default value of 1 hour
- ## The value should be set to an interval within which fewer than 1000 apps are run
- ##
- ## Dependencies: Requires the modules: pytz and tzlocal
- ## Those modules need to be installed on the machine running the script using commands like:
- ## $ sudo pip install pytz
- ## $ sudo pip install tzlocal
- ##
- ## Usage: ./mr-usage-by-user.py [<END_DATE> [<NUM_DAYS>]]
- ##
- ## Args: END_DATE (optional) - Sets the end date for the YARN history to be reported on. Defaults to the current day
- ## Date should be formatted as YYYY-mm-dd
- ## NUM_DAYS (optional) - Set to the number of days of YARN usage to report on. Defaults to 7 days
- ##
- ## Examples:
- ## Report on 7 days activity ending today:
- ## ./mr-usage-by-user.py
- ##
- ## Report on 7 days activity ending 2016-04-01:
- ## ./mr-usage-by-user.py 2016-04-01
- ##
- ## Report on 3 days activity ending 2016-04-01:
- ## ./mr-usage-by-user.py 2016-04-01 3
- ##
- ## Edit the settings below to connect to your Cluster
- ##
- ## ********************************************************************************
- import sys
- from datetime import time, datetime, timedelta
- from sets import Set
- import pytz
- import tzlocal
- from cm_api.api_client import ApiResource
- ## Settings to connect to the cluster
- cm_host = "<YOUR CM HOST>"
- cm_port = "7180"
- cm_login = "admin"
- cm_password = "admin"
- cluster_name = "<YOUR CLUSTER NAME>"
- ## I'll hardcode a filename for the report to be written to
- filename = "mr-usage-by-user-" + str(datetime.today().date()) + ".csv"
- ## Needed for python < v2.7
- def total_seconds(td):
- return long((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)) / 10**6
- ## Set the appropriate timezone. I will use the local timezone
- local_timezone = tzlocal.get_localzone()
- ## Set to True for verbose output
- DEBUG = False
- ## Check Command Line Args
- if len(sys.argv) > 3:
- print "Error: Wrong number of arguments"
- print "Usage: ./mr-usage-by-user.py [<END_DATE> [<NUM_DAYS>]]"
- print " Args: END_DATE (optional) - Sets the end date for the YARN history to be reported on. Defaults to the current day"
- print " Date should be formatted as YYYY-mm-dd"
- print " NUM_DAYS (optional) - Set to the number of days of YARN usage to report on. Defaults to 7 days"
- print ""
- print "Examples:"
- print " Report on 7 days activity ending today:"
- print " ./mr-usage-by-user.py"
- print ""
- print " Report on 7 days activity ending 2016-04-01:"
- print " ./mr-usage-by-user.py 2016-04-01"
- print ""
- print " Report on 3 days activity ending 2016-04-01:"
- print " ./mr-usage-by-user.py 2016-04-01 3"
- print "\n\n"
- quit(1)
- ## end_date
- end_date = None
- if len(sys.argv)> 1:
- end_date = datetime.strptime(sys.argv[1], '%Y-%m-%d')
- if end_date is None:
- end_date = datetime.today().date()
- ## num_days
- num_days = None
- if len(sys.argv)> 2:
- num_days = int(sys.argv[2])
- if num_days is None:
- num_days = 7
- ## Set the start and end times
- start_time = datetime.combine(end_date, datetime.min.time()) - timedelta(days = num_days)
- end_time = datetime.combine(end_date, datetime.max.time())
- if DEBUG:
- print "\n\nDEBUG start_time = " + str(start_time)
- print "DEBUG end_time = " + str(end_time) + "\n\n"
- ## Connect to CM
- api = ApiResource(server_host=cm_host, server_port=cm_port, username=cm_login, password=cm_password)
- ## Get the Cluster
- cluster = None
- clusters = api.get_all_clusters()
- for c in clusters:
- if c.displayName == cluster_name:
- cluster = c
- break
- if cluster is None:
- print "\nError: Cluster '" + cluster_name + "' not found"
- quit(1)
- print "\n\nConnected to Cloudera Manager on " + cm_host + ":" + cm_port
- ## Get YARN Service
- yarn = None
- service_list = cluster.get_all_services()
- for service in service_list:
- if service.type == "YARN":
- yarn = service
- break
- if yarn is None:
- print "Error: Could not locate YARN Service"
- quit(1)
- print "\nGetting YARN History for Cluster \'" + cluster_name + "\' from " + str(start_time.date()) + " to " + str(end_time.date())
- ## Create a dictionary to hold all jobs
- jobs = {}
- ## Define a time window for each call to yarn.get_yarn_applications
- ## The interval should be set so that fewer than 1000 jobs execute within the time window
- ## I'll hardcode it here for 1 hour
- batch_time_interval = timedelta(minutes = 60 * 1)
- ## We'll keep track of each app we see to avoid dupes
- apps_processed = set()
- batch_end_time = start_time
- while batch_end_time < end_time:
- ## set the start and end time for each batch
- start_time = batch_end_time
- batch_end_time = batch_end_time + batch_time_interval
- if batch_end_time > end_time:
- batch_end_time = end_time
- ## We'll keep track of the number of successful apps we count per batch
- number_of_successful_apps_per_batch = 0
- ## Get YARN Applications
- response = yarn.get_yarn_applications(start_time, batch_end_time, filter_str='', limit=1000, offset=0)
- ## For each job that has a state of "SUCCEEDED", add the job to the dictionary of jobs per day per user
- for app in response.applications:
- if app.state == "SUCCEEDED":
- ## check to see if this app has already been processed
- if app.applicationId in apps_processed:
- break;
- ## This is the first time we've seen this app; add it to the processed set
- apps_processed.add(app.applicationId)
- number_of_successful_apps_per_batch = number_of_successful_apps_per_batch + 1
- user = app.user
- appId = app.applicationId
- appDate = app.startTime.replace(tzinfo=pytz.utc).astimezone(local_timezone).date()
- ## create a new dictionary of jobs per day
- if not jobs.has_key(appDate):
- jobs[appDate] = {}
- ## create a new dictionary of jobs per user
- if not jobs[appDate].has_key(user):
- jobs[appDate][user] = {}
- ## create a new dictionary of job attributes for each job
- if not jobs[appDate][user].has_key(appId):
- jobs[appDate][user][appId] = {}
- ## Add the job's attributes to each day's user's job's dictionary
- jobs[appDate][user][appId]["name"] = app.name
- jobs[appDate][user][appId]["pool"] = app.pool
- jobs[appDate][user][appId]["startTime"] = app.startTime
- jobs[appDate][user][appId]["endTime"] = app.endTime
- jobs[appDate][user][appId]["application_duration"] = total_seconds(app.endTime - app.startTime)
- jobs[appDate][user][appId]["cpu_milliseconds"] = long(app.attributes["cpu_milliseconds"])
- jobs[appDate][user][appId]["physical_memory_bytes"] = long(app.attributes["physical_memory_bytes"])
- if DEBUG:
- print "\n\n-- DEBUG --------------"
- print "adding job to job list for " + str(appDate) + " " + user + " " + appId
- print "name: " + app.name
- print "pool: " + app.pool
- print "startTime: " + str(app.startTime)
- print "endTime: " + str(app.endTime)
- print "duration: " + str(total_seconds(app.endTime - app.startTime))
- print "cpu: " + str(app.attributes["cpu_milliseconds"])
- print "memory: " + str(app.attributes["physical_memory_bytes"])
- if number_of_successful_apps_per_batch > 0:
- print "Retrieved " + str(number_of_successful_apps_per_batch) + " successfully completed apps between " + str(start_time) + " and " + str(batch_end_time)
- print "\n\n"
- print "Aggregated results by day and user"
- print "\n\n"
- report_file = open(filename, 'w')
- report_file.write("Date,User,#Jobs,Duration(secs),CPU(secs),Memory(MB)\n")
- print "Date User #Jobs Duration(secs) CPU(secs) Memory(MB)"
- print "--------------------------------------------------------------------------------------"
- dates = sorted(jobs.keys())
- for the_date in dates:
- users = sorted(jobs[the_date].keys())
- for the_user in users:
- num_jobs = len(jobs[the_date][the_user])
- duration = 0
- cpu = 0
- memory = 0
- for the_job in jobs[the_date][the_user].keys():
- ## aggregate the Duration
- duration = duration + jobs[the_date][the_user][the_job]["application_duration"]
- ## aggregate the CPU
- cpu = cpu + jobs[the_date][the_user][the_job]["cpu_milliseconds"]
- ## aggregate the Memory
- memory = memory + jobs[the_date][the_user][the_job]["physical_memory_bytes"]
- dateStr = str(the_date)
- numJobsStr = ("%0.0f" % num_jobs)
- durationStr = ("%0.0f" % (duration)) # round to nearest second
- cpuStr = ("%0.0f" % (cpu / 1000)) # round to nearest second
- memoryStr = ("%0.0f" % (memory / (1024 * 1024))) # round to MB
- report_file.write(dateStr + "," + the_user + "," + numJobsStr + "," + durationStr + "," + cpuStr + "," + memoryStr + "\n")
- print dateStr + "\t" + the_user + "\t" + numJobsStr.rjust(10) + "\t" + durationStr.rjust(10) + "\t" + cpuStr.rjust(10) + "\t" + memoryStr.rjust(10)
- print "\n\n"
- report_file.close()
- print "Report output saved to file: " + filename
- print "\n\n"
- print "Done\n\n\n"
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement