Skip to content

Instantly share code, notes, and snippets.

@kornypoet
Last active August 29, 2015 14:05
Show Gist options
  • Save kornypoet/3c870128ed0f16c4137b to your computer and use it in GitHub Desktop.
Save kornypoet/3c870128ed0f16c4137b to your computer and use it in GitHub Desktop.
import argparse
import logging
import subprocess
import sys
import time
# Commandline args
parser = argparse.ArgumentParser(description='Deploy a CDH5 Hadoop cluster using the Cloudera Manager API')
parser.add_argument('cluster', nargs='?', default='cdh', help='Name of the cluster to interact with')
parser.add_argument('-c', '--host', default='localhost', help="Hostname of the Cloudera Manager API. Default 'localhost'")
parser.add_argument('-t', '--timeout', default=300, type=int, help="Timeout for API command responses. Default '300'")
parser.add_argument('-f', '--format-namenode', action='store_true', help="Format the namenode. Default 'False'")
parser.add_argument('-v', '--verbose', action='store_true', help="Turn on verbose logging. Default 'False'")
args = parser.parse_args()
# Logging
log = logging.getLogger()
log.setLevel(logging.DEBUG if args.verbose else logging.INFO)
stderr = logging.StreamHandler()
stderr.setFormatter(logging.Formatter('[%(levelname)-5s] %(message)s'))
log.addHandler(stderr)
# Api methods
def find_or_create_cluster(api, name):
log.info('Searching for cluster %s', name)
for c in api.get_all_clusters():
if c.name == name:
log.info(' Found existing cluster %s', c)
return c
log.info(' Creating new cluster %s', name)
return api.create_cluster(name, 'CDH5')
def add_missing_hosts(cluster, host_info):
log.info('Assigning hosts to %s', cluster.name)
existing_hosts = [h.hostId for h in cluster.list_hosts()]
log.debug(' Existing hosts %s', existing_hosts)
missing_hosts = [h for h in host_info if h not in existing_hosts]
log.debug(' Missing hosts %s', missing_hosts)
if len(missing_hosts) > 0:
cluster.add_hosts(missing_hosts)
log.info(' Hosts assigned %s', [h.hostId for h in cluster.list_hosts()])
def select_parcel(cluster, name, version):
for p in cluster.get_all_parcels():
if p.product == name and int(p.version[0]) >= version:
log.debug('Parcel found %s', p)
return p
def wait_on_parcel_action(cluster, parcel, action, *conditions):
log.info('Action %s called until parcel %s', action.__name__, ' or '.join(conditions))
action()
while True:
status = cluster.get_parcel(parcel.product, parcel.version)
if status.state.errors:
raise Exception(str(status.state.errors))
elif status.stage in conditions:
log.info(' Parcel in stage %s', status.stage)
break
else:
log.debug(' Parcel in stage %s', status.stage)
log.info(' %5.2f%% complete', 100.0 * status.state.progress / (status.state.totalProgress / 2))
time.sleep(5)
def find_or_create_service(cluster, name):
log.info('Searching for service %s', name)
for s in cluster.get_all_services():
if s.name == name:
log.info(' Found existing service %s', s)
return s
log.info(' Creating new service %s', name)
return cluster.create_service(name, name.upper())
def find_or_create_management(api, setup):
log.info('Searching for management service')
cm = api.get_cloudera_manager()
try:
mgmt = cm.get_service()
log.info(' Found existing management service %s', mgmt)
return mgmt
except:
log.info(' Creating new management service')
return cm.create_mgmt_service(setup)
def find_or_create_role(service, name, role, host):
log.info('Searching for role %s', name)
for r in service.get_all_roles():
if r.name == name:
log.info(' Found existing role %s', r)
return r
log.info(' Creating new role %s %s on host %s', name, role, host)
return service.create_role(name, role, host)
def update_service_config(service, role_config):
log.info('Updating service %s config', service)
if 'service_wide' in role_config:
service.update_config(role_config['service_wide'])
for g in service.get_all_role_config_groups():
if g.roleType in role_config:
log.debug(' Updating config for %s', g.roleType)
g.update_config(role_config[g.roleType])
def wait_on_service_action(action, args=[], timeout=args.timeout, failure_message='Service action failed'):
cmd = action(*args)
if not getattr(cmd, 'wait', None):
cmd = cmd[0]
if not cmd.wait(timeout).success:
raise Exception(failure_message)
def format_namenode(service, do_format):
if do_format:
log.info('Formatting namenode')
wait_on_service_action(service.format_hdfs, args=['hdfs-nn'], failure_message='Failed to format HDFS')
log.info(' Namenode formatted')
else:
log.info('Skipping namenode formatting')
def stop_services(*services):
log.info('Stopping services %s', [s.name for s in services])
for s in services:
if s.serviceState != 'STOPPED':
wait_on_service_action(s.stop, failure_message='Failed to stop %s' % s.name)
log.info(' Service %s is stopped', s.name)
def start_services(*services):
log.info('Starting services %s', [s.name for s in services])
for s in services:
wait_on_service_action(s.start, failure_message='Failed to start %s' % s.name)
log.info(' Service %s started', s.name)
def redeploy_client_config(*services):
for s in services:
log.info('Redeploying client config for %s', s.name)
for r in s.get_all_roles():
wait_on_service_action(s.deploy_client_config, args=[r.name], failure_message='Failed to redeploy client config for %s' % r.name)
# List of expected hosts
hostList = [
u'cdh0.platform.infochimps',
u'cdh1.platform.infochimps',
u'cdh2.platform.infochimps',
u'cdhfoyer.platform.infochimps',
]
# Role mapping for the cluster
roleMap = {
'mgmt': [
{
'name': 'mgmt-sm',
'type': 'SERVICEMONITOR',
'host': hostList[0],
},
{
'name': 'mgmt-am',
'type': 'ACTIVITYMONITOR',
'host': hostList[0],
},
{
'name': 'mgmt-ev',
'type': 'EVENTSERVER',
'host': hostList[0],
},
{
'name': 'mgmt-hm',
'type': 'HOSTMONITOR',
'host': hostList[0],
},
{
'name': 'mgmt-ap',
'type': 'ALERTPUBLISHER',
'host': hostList[0],
},
],
'hdfs': [
{
'name': 'hdfs-nn',
'type': 'NAMENODE',
'host': hostList[0],
},
{
'name': 'hdfs-snn',
'type': 'SECONDARYNAMENODE',
'host': hostList[0],
},
{
'name': 'hdfs-dn0',
'type': 'DATANODE',
'host': hostList[0],
},
{
'name': 'hdfs-dn1',
'type': 'DATANODE',
'host': hostList[1],
},
{
'name': 'hdfs-dn2',
'type': 'DATANODE',
'host': hostList[2],
},
{
'name': 'hdfs-nfs',
'type': 'NFSGATEWAY',
'host': hostList[0],
},
{
'name': 'hdfs-hfs',
'type': 'HTTPFS',
'host': hostList[0],
},
{
'name': 'hdfs-bal',
'type': 'BALANCER',
'host': hostList[0],
},
],
'yarn': [
{
'name': 'yarn-rm',
'type': 'RESOURCEMANAGER',
'host': hostList[0],
},
{
'name': 'yarn-nm0',
'type': 'NODEMANAGER',
'host': hostList[0],
},
{
'name': 'yarn-nm1',
'type': 'NODEMANAGER',
'host': hostList[1],
},
{
'name': 'yarn-nm2',
'type': 'NODEMANAGER',
'host': hostList[2],
},
{
'name': 'yarn-hs',
'type': 'JOBHISTORY',
'host': hostList[0],
},
]
}
# All configuration overrides by service name: role name
serviceConfig = {
'mgmt': {
'ACTIVITYMONITOR': {
'firehose_database_host': hostList[0] + ':7432',
'firehose_database_user': 'amon',
'firehose_database_password': '1Vq5C4uC1p',
'firehose_database_type': 'postgresql',
'firehose_database_name': 'amon',
'firehose_heapsize': 268435456,
},
'NAVIGATOR': {
},
'ALERTPUBLISHER': {
},
'EVENTSERVER': {
'event_server_heapsize': 268435456,
},
'REPORTSMANAGER': {
},
'SERVICEMONITOR': {
'firehose_heapsize': 268435456,
},
'HOSTMONITOR': {
'firehose_heapsize': 268435456,
},
'NAVIGATORMETASERVER': {
},
},
'hdfs': {
'service_wide': {
'dfs_replication': 2,
},
'NAMENODE': {
'dfs_name_dir_list': '/data/dfs/nn',
'dfs_namenode_handler_count': 30,
},
'SECONDARYNAMENODE': {
'fs_checkpoint_dir_list': '/data/dfs/snn',
},
'DATANODE': {
'dfs_data_dir_list': '/data/dfs/data',
'dfs_datanode_failed_volumes_tolerated': 0,
},
'FAILOVERCONTROLLER': {
},
'BALANCER': {
'balancer_java_heapsize': 268435456,
},
'JOURNALNODE': {
},
'HTTPFS': {
},
'NFSGATEWAY': {
},
'GATEWAY': {
'dfs_client_use_trash': True,
},
},
'yarn': {
'service_wide': {
'hdfs_service': 'hdfs',
},
'RESOURCEMANAGER': {
},
'JOBHISTORY': {
},
'NODEMANAGER': {
'yarn_nodemanager_local_dirs': '/mapred/local',
'yarn_nodemanager_resource_memory_mb': '2048',
},
'GATEWAY': {
'mapred_reduce_tasks': 10,
'mapred_submit_replication': 2,
},
}
}
#!/usr/bin/env python
from cm_api.api_client import ApiResource
from cm_api.endpoints.services import ApiServiceSetupInfo
from cdhConfigDict import hostList
from cdhConfigDict import roleMap
from cdhConfigDict import serviceConfig
from cdhApiMethods import *
# Main
log.debug('Running script with args %s', args)
log.debug('Using configuration %s', serviceConfig)
api = ApiResource(args.host, username='admin', password='admin')
cluster = find_or_create_cluster(api, args.cluster)
add_missing_hosts(cluster, hostList)
parcel = select_parcel(cluster, 'CDH', 5)
wait_on_parcel_action(cluster, parcel, parcel.start_download, 'DOWNLOADED', 'DISTRIBUTED', 'ACTIVATED')
wait_on_parcel_action(cluster, parcel, parcel.start_distribution, 'DISTRIBUTED', 'ACTIVATED')
wait_on_parcel_action(cluster, parcel, parcel.activate, 'ACTIVATED')
hdfs = find_or_create_service(cluster, 'hdfs')
yarn = find_or_create_service(cluster, 'yarn')
mgmt = find_or_create_management(api, ApiServiceSetupInfo())
services = [hdfs, yarn, mgmt]
for s in services:
if s.name in roleMap:
for r in roleMap[s.name]:
find_or_create_role(s, r['name'], r['type'], r['host'])
for sv in services:
if sv.name in serviceConfig:
update_service_config(sv, serviceConfig[sv.name])
stop_services(*services)
format_namenode(hdfs, args.format_namenode)
start_services(hdfs)
log.info('Creating hdfs application directories')
wait_on_service_action(hdfs.create_hdfs_tmp, failure_message='Failed to create hdfs /tmp')
wait_on_service_action(yarn.create_yarn_job_history_dir, failure_message='Failed to create hdfs /user/history')
wait_on_service_action(yarn.create_yarn_node_manager_remote_app_log_dir, failure_message='Failed to create hdfs /tmp/logs')
start_services(yarn, mgmt)
redeploy_client_config(hdfs, yarn)
log.info('Installation completed successfully. Run your hdfs and yarn smoke tests.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment