Last active
June 13, 2019 02:19
-
-
Save onefoursix/b16bbf70afb5419b2fd1486983b02749 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
## ******************************************************************************** | |
## mr-usage-by-user.py | |
## | |
## Aggregates YARN MapReduce usage by day and user and writes the results to the console and to a file | |
## | |
## As the CM-API call "yarn.get_yarn_applications" can only return 1000 jobs max per call the script will make | |
## multiple calls to yarn.get_yarn_applications and aggregate all results between the script's global start and end times | |
## | |
## The time window batch size for the start and end times in the call to yarn.get_yarn_applications is | |
## set in the variable batch_time_interval and has a default value of 1 hour | |
## The value should be set to an interval within which fewer than 1000 apps are run | |
## | |
## Dependencies: Requires the modules: pytz and tzlocal | |
## Those modules need to be installed on the machine running the script using commands like: | |
## $ sudo pip install pytz | |
## $ sudo pip install tzlocal | |
## | |
## Usage: ./mr-usage-by-user.py [<END_DATE> [<NUM_DAYS>]] | |
## | |
## Args: END_DATE (optional) - Sets the end date for the YARN history to be reported on. Defaults to the current day | |
## Date should be formatted as YYYY-mm-dd | |
## NUM_DAYS (optional) - Set to the number of days of YARN usage to report on. Defaults to 7 days | |
## | |
## Examples: | |
## Report on 7 days activity ending today: | |
## ./mr-usage-by-user.py | |
## | |
## Report on 7 days activity ending 2016-04-01: | |
## ./mr-usage-by-user.py 2016-04-01 | |
## | |
## Report on 3 days activity ending 2016-04-01: | |
## ./mr-usage-by-user.py 2016-04-01 3 | |
## | |
## Edit the settings below to connect to your Cluster | |
## | |
## ******************************************************************************** | |
import sys | |
from datetime import time, datetime, timedelta | |
from sets import Set | |
import pytz | |
import tzlocal | |
from cm_api.api_client import ApiResource | |
## Settings to connect to the cluster | |
cm_host = "<YOUR CM HOST>" | |
cm_port = "7180" | |
cm_login = "admin" | |
cm_password = "admin" | |
cluster_name = "<YOUR CLUSTER NAME>" | |
## I'll hardcode a filename for the report to be written to | |
filename = "mr-usage-by-user-" + str(datetime.today().date()) + ".csv" | |
## Needed for python < v2.7 | |
def total_seconds(td): | |
return long((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)) / 10**6 | |
## Set the appropriate timezone. I will use the local timezone | |
local_timezone = tzlocal.get_localzone() | |
## Set to True for verbose output | |
DEBUG = False | |
## Check Command Line Args | |
if len(sys.argv) > 3: | |
print "Error: Wrong number of arguments" | |
print "Usage: ./mr-usage-by-user.py [<END_DATE> [<NUM_DAYS>]]" | |
print " Args: END_DATE (optional) - Sets the end date for the YARN history to be reported on. Defaults to the current day" | |
print " Date should be formatted as YYYY-mm-dd" | |
print " NUM_DAYS (optional) - Set to the number of days of YARN usage to report on. Defaults to 7 days" | |
print "" | |
print "Examples:" | |
print " Report on 7 days activity ending today:" | |
print " ./mr-usage-by-user.py" | |
print "" | |
print " Report on 7 days activity ending 2016-04-01:" | |
print " ./mr-usage-by-user.py 2016-04-01" | |
print "" | |
print " Report on 3 days activity ending 2016-04-01:" | |
print " ./mr-usage-by-user.py 2016-04-01 3" | |
print "\n\n" | |
quit(1) | |
## end_date | |
end_date = None | |
if len(sys.argv)> 1: | |
end_date = datetime.strptime(sys.argv[1], '%Y-%m-%d') | |
if end_date is None: | |
end_date = datetime.today().date() | |
## num_days | |
num_days = None | |
if len(sys.argv)> 2: | |
num_days = int(sys.argv[2]) | |
if num_days is None: | |
num_days = 7 | |
## Set the start and end times | |
start_time = datetime.combine(end_date, datetime.min.time()) - timedelta(days = num_days) | |
end_time = datetime.combine(end_date, datetime.max.time()) | |
if DEBUG: | |
print "\n\nDEBUG start_time = " + str(start_time) | |
print "DEBUG end_time = " + str(end_time) + "\n\n" | |
## Connect to CM | |
api = ApiResource(server_host=cm_host, server_port=cm_port, username=cm_login, password=cm_password) | |
## Get the Cluster | |
cluster = None | |
clusters = api.get_all_clusters() | |
for c in clusters: | |
if c.displayName == cluster_name: | |
cluster = c | |
break | |
if cluster is None: | |
print "\nError: Cluster '" + cluster_name + "' not found" | |
quit(1) | |
print "\n\nConnected to Cloudera Manager on " + cm_host + ":" + cm_port | |
## Get YARN Service | |
yarn = None | |
service_list = cluster.get_all_services() | |
for service in service_list: | |
if service.type == "YARN": | |
yarn = service | |
break | |
if yarn is None: | |
print "Error: Could not locate YARN Service" | |
quit(1) | |
print "\nGetting YARN History for Cluster \'" + cluster_name + "\' from " + str(start_time.date()) + " to " + str(end_time.date()) | |
## Create a dictionary to hold all jobs | |
jobs = {} | |
## Define a time window for each call to yarn.get_yarn_applications | |
## The interval should be set so that fewer than 1000 jobs execute within the time window | |
## I'll hardcode it here for 1 hour | |
batch_time_interval = timedelta(minutes = 60 * 1) | |
## We'll keep track of each app we see to avoid dupes | |
apps_processed = set() | |
batch_end_time = start_time | |
while batch_end_time < end_time: | |
## set the start and end time for each batch | |
start_time = batch_end_time | |
batch_end_time = batch_end_time + batch_time_interval | |
if batch_end_time > end_time: | |
batch_end_time = end_time | |
## We'll keep track of the number of successful apps we count per batch | |
number_of_successful_apps_per_batch = 0 | |
## Get YARN Applications | |
response = yarn.get_yarn_applications(start_time, batch_end_time, filter_str='', limit=1000, offset=0) | |
## For each job that has a state of "SUCCEEDED", add the job to the dictionary of jobs per day per user | |
for app in response.applications: | |
if app.state == "SUCCEEDED": | |
## check to see if this app has already been processed | |
if app.applicationId in apps_processed: | |
break; | |
## This is the first time we've seen this app; add it to the processed set | |
apps_processed.add(app.applicationId) | |
number_of_successful_apps_per_batch = number_of_successful_apps_per_batch + 1 | |
user = app.user | |
appId = app.applicationId | |
appDate = app.startTime.replace(tzinfo=pytz.utc).astimezone(local_timezone).date() | |
## create a new dictionary of jobs per day | |
if not jobs.has_key(appDate): | |
jobs[appDate] = {} | |
## create a new dictionary of jobs per user | |
if not jobs[appDate].has_key(user): | |
jobs[appDate][user] = {} | |
## create a new dictionary of job attributes for each job | |
if not jobs[appDate][user].has_key(appId): | |
jobs[appDate][user][appId] = {} | |
## Add the job's attributes to each day's user's job's dictionary | |
jobs[appDate][user][appId]["name"] = app.name | |
jobs[appDate][user][appId]["pool"] = app.pool | |
jobs[appDate][user][appId]["startTime"] = app.startTime | |
jobs[appDate][user][appId]["endTime"] = app.endTime | |
jobs[appDate][user][appId]["application_duration"] = total_seconds(app.endTime - app.startTime) | |
jobs[appDate][user][appId]["cpu_milliseconds"] = long(app.attributes["cpu_milliseconds"]) | |
jobs[appDate][user][appId]["physical_memory_bytes"] = long(app.attributes["physical_memory_bytes"]) | |
if DEBUG: | |
print "\n\n-- DEBUG --------------" | |
print "adding job to job list for " + str(appDate) + " " + user + " " + appId | |
print "name: " + app.name | |
print "pool: " + app.pool | |
print "startTime: " + str(app.startTime) | |
print "endTime: " + str(app.endTime) | |
print "duration: " + str(total_seconds(app.endTime - app.startTime)) | |
print "cpu: " + str(app.attributes["cpu_milliseconds"]) | |
print "memory: " + str(app.attributes["physical_memory_bytes"]) | |
if number_of_successful_apps_per_batch > 0: | |
print "Retrieved " + str(number_of_successful_apps_per_batch) + " successfully completed apps between " + str(start_time) + " and " + str(batch_end_time) | |
print "\n\n" | |
print "Aggregated results by day and user" | |
print "\n\n" | |
report_file = open(filename, 'w') | |
report_file.write("Date,User,#Jobs,Duration(secs),CPU(secs),Memory(MB)\n") | |
print "Date User #Jobs Duration(secs) CPU(secs) Memory(MB)" | |
print "--------------------------------------------------------------------------------------" | |
dates = sorted(jobs.keys()) | |
for the_date in dates: | |
users = sorted(jobs[the_date].keys()) | |
for the_user in users: | |
num_jobs = len(jobs[the_date][the_user]) | |
duration = 0 | |
cpu = 0 | |
memory = 0 | |
for the_job in jobs[the_date][the_user].keys(): | |
## aggregate the Duration | |
duration = duration + jobs[the_date][the_user][the_job]["application_duration"] | |
## aggregate the CPU | |
cpu = cpu + jobs[the_date][the_user][the_job]["cpu_milliseconds"] | |
## aggregate the Memory | |
memory = memory + jobs[the_date][the_user][the_job]["physical_memory_bytes"] | |
dateStr = str(the_date) | |
numJobsStr = ("%0.0f" % num_jobs) | |
durationStr = ("%0.0f" % (duration)) # round to nearest second | |
cpuStr = ("%0.0f" % (cpu / 1000)) # round to nearest second | |
memoryStr = ("%0.0f" % (memory / (1024 * 1024))) # round to MB | |
report_file.write(dateStr + "," + the_user + "," + numJobsStr + "," + durationStr + "," + cpuStr + "," + memoryStr + "\n") | |
print dateStr + "\t" + the_user + "\t" + numJobsStr.rjust(10) + "\t" + durationStr.rjust(10) + "\t" + cpuStr.rjust(10) + "\t" + memoryStr.rjust(10) | |
print "\n\n" | |
report_file.close() | |
print "Report output saved to file: " + filename | |
print "\n\n" | |
print "Done\n\n\n" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment