Last active
August 29, 2015 14:05
-
-
Save kornypoet/3c870128ed0f16c4137b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
import subprocess | |
import sys | |
import time | |
# Commandline args | |
parser = argparse.ArgumentParser(description='Deploy a CDH5 Hadoop cluster using the Cloudera Manager API') | |
parser.add_argument('cluster', nargs='?', default='cdh', help='Name of the cluster to interact with') | |
parser.add_argument('-c', '--host', default='localhost', help="Hostname of the Cloudera Manager API. Default 'localhost'") | |
parser.add_argument('-t', '--timeout', default=300, type=int, help="Timeout for API command responses. Default '300'") | |
parser.add_argument('-f', '--format-namenode', action='store_true', help="Format the namenode. Default 'False'") | |
parser.add_argument('-v', '--verbose', action='store_true', help="Turn on verbose logging. Default 'False'") | |
args = parser.parse_args() | |
# Logging | |
log = logging.getLogger() | |
log.setLevel(logging.DEBUG if args.verbose else logging.INFO) | |
stderr = logging.StreamHandler() | |
stderr.setFormatter(logging.Formatter('[%(levelname)-5s] %(message)s')) | |
log.addHandler(stderr) | |
# Api methods | |
def find_or_create_cluster(api, name): | |
log.info('Searching for cluster %s', name) | |
for c in api.get_all_clusters(): | |
if c.name == name: | |
log.info(' Found existing cluster %s', c) | |
return c | |
log.info(' Creating new cluster %s', name) | |
return api.create_cluster(name, 'CDH5') | |
def add_missing_hosts(cluster, host_info): | |
log.info('Assigning hosts to %s', cluster.name) | |
existing_hosts = [h.hostId for h in cluster.list_hosts()] | |
log.debug(' Existing hosts %s', existing_hosts) | |
missing_hosts = [h for h in host_info if h not in existing_hosts] | |
log.debug(' Missing hosts %s', missing_hosts) | |
if len(missing_hosts) > 0: | |
cluster.add_hosts(missing_hosts) | |
log.info(' Hosts assigned %s', [h.hostId for h in cluster.list_hosts()]) | |
def select_parcel(cluster, name, version): | |
for p in cluster.get_all_parcels(): | |
if p.product == name and int(p.version[0]) >= version: | |
log.debug('Parcel found %s', p) | |
return p | |
def wait_on_parcel_action(cluster, parcel, action, *conditions): | |
log.info('Action %s called until parcel %s', action.__name__, ' or '.join(conditions)) | |
action() | |
while True: | |
status = cluster.get_parcel(parcel.product, parcel.version) | |
if status.state.errors: | |
raise Exception(str(status.state.errors)) | |
elif status.stage in conditions: | |
log.info(' Parcel in stage %s', status.stage) | |
break | |
else: | |
log.debug(' Parcel in stage %s', status.stage) | |
log.info(' %5.2f%% complete', 100.0 * status.state.progress / (status.state.totalProgress / 2)) | |
time.sleep(5) | |
def find_or_create_service(cluster, name): | |
log.info('Searching for service %s', name) | |
for s in cluster.get_all_services(): | |
if s.name == name: | |
log.info(' Found existing service %s', s) | |
return s | |
log.info(' Creating new service %s', name) | |
return cluster.create_service(name, name.upper()) | |
def find_or_create_management(api, setup): | |
log.info('Searching for management service') | |
cm = api.get_cloudera_manager() | |
try: | |
mgmt = cm.get_service() | |
log.info(' Found existing management service %s', mgmt) | |
return mgmt | |
except: | |
log.info(' Creating new management service') | |
return cm.create_mgmt_service(setup) | |
def find_or_create_role(service, name, role, host): | |
log.info('Searching for role %s', name) | |
for r in service.get_all_roles(): | |
if r.name == name: | |
log.info(' Found existing role %s', r) | |
return r | |
log.info(' Creating new role %s %s on host %s', name, role, host) | |
return service.create_role(name, role, host) | |
def update_service_config(service, role_config): | |
log.info('Updating service %s config', service) | |
if 'service_wide' in role_config: | |
service.update_config(role_config['service_wide']) | |
for g in service.get_all_role_config_groups(): | |
if g.roleType in role_config: | |
log.debug(' Updating config for %s', g.roleType) | |
g.update_config(role_config[g.roleType]) | |
def wait_on_service_action(action, args=[], timeout=args.timeout, failure_message='Service action failed'): | |
cmd = action(*args) | |
if not getattr(cmd, 'wait', None): | |
cmd = cmd[0] | |
if not cmd.wait(timeout).success: | |
raise Exception(failure_message) | |
def format_namenode(service, do_format): | |
if do_format: | |
log.info('Formatting namenode') | |
wait_on_service_action(service.format_hdfs, args=['hdfs-nn'], failure_message='Failed to format HDFS') | |
log.info(' Namenode formatted') | |
else: | |
log.info('Skipping namenode formatting') | |
def stop_services(*services): | |
log.info('Stopping services %s', [s.name for s in services]) | |
for s in services: | |
if s.serviceState != 'STOPPED': | |
wait_on_service_action(s.stop, failure_message='Failed to stop %s' % s.name) | |
log.info(' Service %s is stopped', s.name) | |
def start_services(*services): | |
log.info('Starting services %s', [s.name for s in services]) | |
for s in services: | |
wait_on_service_action(s.start, failure_message='Failed to start %s' % s.name) | |
log.info(' Service %s started', s.name) | |
def redeploy_client_config(*services): | |
for s in services: | |
log.info('Redeploying client config for %s', s.name) | |
for r in s.get_all_roles(): | |
wait_on_service_action(s.deploy_client_config, args=[r.name], failure_message='Failed to redeploy client config for %s' % r.name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# List of expected hosts | |
hostList = [ | |
u'cdh0.platform.infochimps', | |
u'cdh1.platform.infochimps', | |
u'cdh2.platform.infochimps', | |
u'cdhfoyer.platform.infochimps', | |
] | |
# Role mapping for the cluster | |
roleMap = { | |
'mgmt': [ | |
{ | |
'name': 'mgmt-sm', | |
'type': 'SERVICEMONITOR', | |
'host': hostList[0], | |
}, | |
{ | |
'name': 'mgmt-am', | |
'type': 'ACTIVITYMONITOR', | |
'host': hostList[0], | |
}, | |
{ | |
'name': 'mgmt-ev', | |
'type': 'EVENTSERVER', | |
'host': hostList[0], | |
}, | |
{ | |
'name': 'mgmt-hm', | |
'type': 'HOSTMONITOR', | |
'host': hostList[0], | |
}, | |
{ | |
'name': 'mgmt-ap', | |
'type': 'ALERTPUBLISHER', | |
'host': hostList[0], | |
}, | |
], | |
'hdfs': [ | |
{ | |
'name': 'hdfs-nn', | |
'type': 'NAMENODE', | |
'host': hostList[0], | |
}, | |
{ | |
'name': 'hdfs-snn', | |
'type': 'SECONDARYNAMENODE', | |
'host': hostList[0], | |
}, | |
{ | |
'name': 'hdfs-dn0', | |
'type': 'DATANODE', | |
'host': hostList[0], | |
}, | |
{ | |
'name': 'hdfs-dn1', | |
'type': 'DATANODE', | |
'host': hostList[1], | |
}, | |
{ | |
'name': 'hdfs-dn2', | |
'type': 'DATANODE', | |
'host': hostList[2], | |
}, | |
{ | |
'name': 'hdfs-nfs', | |
'type': 'NFSGATEWAY', | |
'host': hostList[0], | |
}, | |
{ | |
'name': 'hdfs-hfs', | |
'type': 'HTTPFS', | |
'host': hostList[0], | |
}, | |
{ | |
'name': 'hdfs-bal', | |
'type': 'BALANCER', | |
'host': hostList[0], | |
}, | |
], | |
'yarn': [ | |
{ | |
'name': 'yarn-rm', | |
'type': 'RESOURCEMANAGER', | |
'host': hostList[0], | |
}, | |
{ | |
'name': 'yarn-nm0', | |
'type': 'NODEMANAGER', | |
'host': hostList[0], | |
}, | |
{ | |
'name': 'yarn-nm1', | |
'type': 'NODEMANAGER', | |
'host': hostList[1], | |
}, | |
{ | |
'name': 'yarn-nm2', | |
'type': 'NODEMANAGER', | |
'host': hostList[2], | |
}, | |
{ | |
'name': 'yarn-hs', | |
'type': 'JOBHISTORY', | |
'host': hostList[0], | |
}, | |
] | |
} | |
# All configuration overrides by service name: role name | |
serviceConfig = { | |
'mgmt': { | |
'ACTIVITYMONITOR': { | |
'firehose_database_host': hostList[0] + ':7432', | |
'firehose_database_user': 'amon', | |
'firehose_database_password': '1Vq5C4uC1p', | |
'firehose_database_type': 'postgresql', | |
'firehose_database_name': 'amon', | |
'firehose_heapsize': 268435456, | |
}, | |
'NAVIGATOR': { | |
}, | |
'ALERTPUBLISHER': { | |
}, | |
'EVENTSERVER': { | |
'event_server_heapsize': 268435456, | |
}, | |
'REPORTSMANAGER': { | |
}, | |
'SERVICEMONITOR': { | |
'firehose_heapsize': 268435456, | |
}, | |
'HOSTMONITOR': { | |
'firehose_heapsize': 268435456, | |
}, | |
'NAVIGATORMETASERVER': { | |
}, | |
}, | |
'hdfs': { | |
'service_wide': { | |
'dfs_replication': 2, | |
}, | |
'NAMENODE': { | |
'dfs_name_dir_list': '/data/dfs/nn', | |
'dfs_namenode_handler_count': 30, | |
}, | |
'SECONDARYNAMENODE': { | |
'fs_checkpoint_dir_list': '/data/dfs/snn', | |
}, | |
'DATANODE': { | |
'dfs_data_dir_list': '/data/dfs/data', | |
'dfs_datanode_failed_volumes_tolerated': 0, | |
}, | |
'FAILOVERCONTROLLER': { | |
}, | |
'BALANCER': { | |
'balancer_java_heapsize': 268435456, | |
}, | |
'JOURNALNODE': { | |
}, | |
'HTTPFS': { | |
}, | |
'NFSGATEWAY': { | |
}, | |
'GATEWAY': { | |
'dfs_client_use_trash': True, | |
}, | |
}, | |
'yarn': { | |
'service_wide': { | |
'hdfs_service': 'hdfs', | |
}, | |
'RESOURCEMANAGER': { | |
}, | |
'JOBHISTORY': { | |
}, | |
'NODEMANAGER': { | |
'yarn_nodemanager_local_dirs': '/mapred/local', | |
'yarn_nodemanager_resource_memory_mb': '2048', | |
}, | |
'GATEWAY': { | |
'mapred_reduce_tasks': 10, | |
'mapred_submit_replication': 2, | |
}, | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from cm_api.api_client import ApiResource | |
from cm_api.endpoints.services import ApiServiceSetupInfo | |
from cdhConfigDict import hostList | |
from cdhConfigDict import roleMap | |
from cdhConfigDict import serviceConfig | |
from cdhApiMethods import * | |
# Main | |
log.debug('Running script with args %s', args) | |
log.debug('Using configuration %s', serviceConfig) | |
api = ApiResource(args.host, username='admin', password='admin') | |
cluster = find_or_create_cluster(api, args.cluster) | |
add_missing_hosts(cluster, hostList) | |
parcel = select_parcel(cluster, 'CDH', 5) | |
wait_on_parcel_action(cluster, parcel, parcel.start_download, 'DOWNLOADED', 'DISTRIBUTED', 'ACTIVATED') | |
wait_on_parcel_action(cluster, parcel, parcel.start_distribution, 'DISTRIBUTED', 'ACTIVATED') | |
wait_on_parcel_action(cluster, parcel, parcel.activate, 'ACTIVATED') | |
hdfs = find_or_create_service(cluster, 'hdfs') | |
yarn = find_or_create_service(cluster, 'yarn') | |
mgmt = find_or_create_management(api, ApiServiceSetupInfo()) | |
services = [hdfs, yarn, mgmt] | |
for s in services: | |
if s.name in roleMap: | |
for r in roleMap[s.name]: | |
find_or_create_role(s, r['name'], r['type'], r['host']) | |
for sv in services: | |
if sv.name in serviceConfig: | |
update_service_config(sv, serviceConfig[sv.name]) | |
stop_services(*services) | |
format_namenode(hdfs, args.format_namenode) | |
start_services(hdfs) | |
log.info('Creating hdfs application directories') | |
wait_on_service_action(hdfs.create_hdfs_tmp, failure_message='Failed to create hdfs /tmp') | |
wait_on_service_action(yarn.create_yarn_job_history_dir, failure_message='Failed to create hdfs /user/history') | |
wait_on_service_action(yarn.create_yarn_node_manager_remote_app_log_dir, failure_message='Failed to create hdfs /tmp/logs') | |
start_services(yarn, mgmt) | |
redeploy_client_config(hdfs, yarn) | |
log.info('Installation completed successfully. Run your hdfs and yarn smoke tests.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment