Last active
November 6, 2018 16:59
-
-
Save mrchristine/11fbaf92069aae3e416ceced357a1dc6 to your computer and use it in GitHub Desktop.
Job to update legacy instance types on Databricks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json, pprint, requests, datetime | |
################################################################ | |
## Replace the token variable and environment url below | |
################################################################ | |
# Helper to pretty print json | |
def pprint_j(i): | |
print json.dumps(i, indent=4, sort_keys=True) | |
class DatabricksRestClient: | |
"""A class to define wrappers for the REST API""" | |
def __init__(self, token="ABCDEFG1234", url="https://myenv.cloud.databricks.com"): | |
self.__token = {'Authorization': 'Bearer {0}'.format(token)} | |
self.__url = url | |
def get(self, endpoint, json_params = {}, printJson = False): | |
if json_params: | |
results = requests.get(self.__url + '/api/2.0' + endpoint, headers=self.__token, params=json_params).json() | |
else: | |
results = requests.get(self.__url + '/api/2.0' + endpoint, headers=self.__token).json() | |
if printJson: | |
print json.dumps(results, indent=4, sort_keys=True) | |
return results | |
def post(self, endpoint, json_params = {}, printJson = True): | |
if json_params: | |
raw_results = requests.post(self.__url + '/api/2.0' + endpoint, headers=self.__token, json=json_params) | |
results = raw_results.json() | |
else: | |
print "Must have a payload in json_args param." | |
return {} | |
if printJson: | |
print json.dumps(results, indent=4, sort_keys=True) | |
# if results are empty, let's return the return status | |
if results: | |
results['http_status_code'] = raw_results.status_code | |
return results | |
else: | |
return {'http_status_code': raw_results.status_code} | |
def list_node_types(self): | |
return self.get("/clusters/list-node-types") | |
def get_jobs_list(self, printJson = False): | |
""" Returns an array of json objects for jobs """ | |
jobs = self.get("/jobs/list", printJson = printJson) | |
return jobs['jobs'] | |
def get_job_id(self, name): | |
jobs = self.get_jobs_list() | |
for i in jobs: | |
if i['settings']['name'] == name: | |
return i['job_id'] | |
return None | |
def create_or_update_job_config(self, job_name, new_config): | |
job_id = self.get_job_id(job_name) | |
if job_id is None: | |
print "Job does not exist. Create new job: " | |
resp = self.post('/jobs/create', json_params = new_config) | |
# create the job config here | |
else: | |
resp = self.get('/jobs/get?job_id={0}'.format(job_id)) | |
print "Print the current configs: " | |
cur_config = resp.pop('settings') | |
cur_name = cur_config['name'] | |
pprint_j(cur_config) | |
# remove the name setting from above, and create a new_settings config to point to existing configs | |
new_config['name'] = cur_name | |
update_config = resp | |
update_config['new_settings'] = new_config | |
print "Applying new configs: " | |
pprint_j(update_config) | |
resp = self.post('/jobs/reset', json_params = update_config) | |
print resp | |
def print_job_details(self, job_id): | |
resp = self.get('/jobs/get?job_id={0}'.format(job_id)) | |
print "Job details: " | |
pprint_j(resp) | |
def get_spark_versions(self): | |
return self.get("/clusters/spark-versions") | |
###################### | |
# Global functions | |
####################### | |
def update_old_node_types(job_conf, node_type=1): | |
# pop old settings | |
job_settings = job_conf.pop('settings') | |
# grab cluster settings | |
cluster_conf = job_settings['new_cluster'] | |
# set the new instance type | |
if node_type == 1: | |
cluster_conf['node_type_id'] = "r3.xlarge" | |
cluster_conf['driver_node_type_id'] = 'r3.xlarge' | |
else: | |
cluster_conf['node_type_id'] = "c3.2xlarge" | |
cluster_conf['driver_node_type_id'] = 'c3.2xlarge' | |
aws_attr = cluster_conf.pop('aws_attributes', None) | |
# add at least 1 EBS volume | |
aws_attr['ebs_volume_count'] = 1 | |
aws_attr['ebs_volume_size'] = 100 | |
aws_attr['ebs_volume_type'] = 'GENERAL_PURPOSE_SSD' | |
cluster_conf['aws_attributes'] = aws_attr | |
# reset the cluster conf | |
job_settings['new_cluster'] = cluster_conf | |
# set the new configs | |
job_conf['new_settings'] = job_settings | |
# remove created_timestamp | |
job_conf.pop("created_time", None) | |
return job_conf | |
def get_legacy_job_ids(jl, node_type=1): | |
# node_type = 1 is memory-optimized | |
# node_type = 2 is compute-optimized | |
if node_type == 1: | |
legacy = "memory-optimized" | |
else: | |
legacy = "compute-optimized" | |
new_clusters = filter(lambda x: x['settings'].get('new_cluster', None) is not None, jl) | |
job_ids = [] | |
legacy_jobs = filter(lambda x: x['settings']['new_cluster']['node_type_id'] == legacy, new_clusters) | |
for job in legacy_jobs: | |
job_ids.append(job['job_id']) | |
return job_ids | |
##################################### | |
# Fix legacy instance types | |
# Update the token and url below | |
#################################### | |
# Get rest api credentials | |
token = "YOUR_TOKEN_HERE" | |
# Simple class to list versions and get active cluster list | |
client = DatabricksRestClient(token, 'https://YOUR_ENV.cloud.databricks.com') | |
# Grab job list and print to screen | |
job_list = client.get_jobs_list(False) | |
legacy_memory_job_ids = get_legacy_job_ids(job_list, 1) | |
legacy_compute_job_ids = get_legacy_job_ids(job_list, 2) | |
with open('old_cluster_configs.json', 'w') as outfile: | |
for jid in legacy_memory_job_ids: | |
outfile.write("\nOld job memory config: \n") | |
job_conf = client.get('/jobs/get?job_id={0}'.format(jid)) | |
json.dump(job_conf, outfile) | |
new_job_conf = update_old_node_types(job_conf, 1) | |
outfile.write("\nNew job config: \n") | |
json.dump(new_job_conf, outfile) | |
resp = client.post('/jobs/reset', new_job_conf) | |
pprint_j(resp) | |
for jid in legacy_compute_job_ids: | |
outfile.write("\nOld job compute config: \n") | |
job_conf = client.get('/jobs/get?job_id={0}'.format(jid)) | |
json.dump(job_conf, outfile) | |
new_job_conf = update_old_node_types(job_conf, 2) | |
outfile.write("\nNew job config: \n") | |
json.dump(new_job_conf, outfile) | |
resp = client.post('/jobs/reset', new_job_conf) | |
pprint_j(resp) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment