Skip to content

Instantly share code, notes, and snippets.

@kornypoet
Created August 7, 2014 19:56
Show Gist options
  • Save kornypoet/e95942dc16f40ba39dce to your computer and use it in GitHub Desktop.
Save kornypoet/e95942dc16f40ba39dce to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import argparse
import subprocess
import sys
import time
from cm_api.api_client import ApiResource
from cm_api.endpoints.services import ApiServiceSetupInfo
parser = argparse.ArgumentParser(description='Deploy a CDH5 Hadoop cluster using the Cloudera Manager API')
parser.add_argument('cluster', nargs='?', default='cdh', help='Name of the cluster to interact with')
parser.add_argument('--host', default='localhost', help="Hostname of the Cloudera Manager API. Default 'localhost'")
parser.add_argument('-f', '--format-namenode', action='store_true', help="Format the namenode. Default 'False'")
args = parser.parse_args()
api = ApiResource(args.host, username='admin', password='admin')
print 'Accessing cluster "{0}"'.format(args.cluster)
try:
cluster = api.get_cluster(args.cluster)
except:
cluster = api.create_cluster(args.cluster, 'CDH5')
else:
pass
finally:
pass
print 'Cluster "{0}" found'.format(cluster)
print 'Adding hosts to "{0}"'.format(cluster.name)
domain = 'platform.infochimps'
hosts = [{'hostId': u'cdh0.' + domain,
'hostname': u'cdh0.' + domain,
'rackId': u'/default'},
{'hostId': u'cdh1.' + domain,
'hostname': u'cdh1.' + domain,
'rackId': u'/default'},
{'hostId': u'cdh2.' + domain,
'hostname': u'cdh2.' + domain,
'rackId': u'/default'},
{'hostId': u'cdhfoyer.' + domain,
'hostname': u'cdhfoyer.' + domain,
'rackId': u'/default'}]
[h0, h1, h2, h3] = hosts
if len(cluster.list_hosts()) < 4:
cluster.add_hosts([ h['hostId'] for h in hosts ])
print 'Hosts "{0}" added'.format(cluster.list_hosts())
all_parcels = cluster.get_all_parcels()
parcel_product = u'CDH'
parcel_major_version = 5
for p in all_parcels:
if p.product == parcel_product and int(p.version[0]) >= parcel_major_version:
parcel = p
parcel_version = p.version
print 'Downloading "{0}" parcel version "{1}"'.format(parcel.product, parcel.version)
parcel.start_download()
while True:
parcel = cluster.get_parcel('CDH', parcel_version)
if parcel.stage == 'DOWNLOADED' or parcel.stage == 'DISTRIBUTED' or parcel.stage == 'ACTIVATED':
break
if parcel.state.errors:
raise Exception(str(parcel.state.errors))
print ' {:5.2f}% downloaded'.format(100.0 * parcel.state.progress / parcel.state.totalProgress)
time.sleep(5)
print 'Download "{0}" parcel version "{1}" complete'.format(parcel.product, parcel.version)
print 'Distributing "{0}" parcel'.format(parcel.product, parcel.version)
parcel.start_distribution()
while True:
parcel = cluster.get_parcel('CDH', parcel_version)
if parcel.stage == 'DISTRIBUTED' or parcel.stage == 'ACTIVATED':
break
if parcel.state.errors:
raise Exception(str(parcel.state.errors))
time.sleep(5)
print ' {:5.2f}% distributed'.format(100.0 * parcel.state.progress / parcel.state.totalProgress)
print 'Distribute "{0}" parcel complete'.format(parcel.product, parcel.version)
print 'Activating "{0}" parcel'.format(parcel.product)
parcel.activate()
print 'Activate "{0}" parcel complete'.format(parcel.product)
print 'Creating services'
try:
hdfs = cluster.get_service('hdfs')
except:
hdfs = cluster.create_service('hdfs', 'HDFS')
print ' Service hdfs created'
try:
yarn = cluster.get_service('yarn')
except:
yarn = cluster.create_service('yarn', 'YARN')
print ' Service yarn created'
print 'Services created'
print 'Creating service roles'
cm = api.get_cloudera_manager()
try:
mgmt = cm.get_service()
except:
mgmt = cm.create_mgmt_service(ApiServiceSetupInfo())
try:
mgmt.create_role('mgmt-sm', 'SERVICEMONITOR', hosts[0]['hostId'])
mgmt.create_role('mgmt-am', 'ACTIVITYMONITOR', hosts[0]['hostId'])
mgmt.create_role('mgmt-ev', 'EVENTSERVER', hosts[0]['hostId'])
mgmt.create_role('mgmt-hm', 'HOSTMONITOR', hosts[0]['hostId'])
mgmt.create_role('mgmt-ap', 'ALERTPUBLISHER', hosts[0]['hostId'])
except:
pass
am_config = {
'firehose_database_host': hosts[0]['hostId'],
'firehose_database_user': 'amon',
'firehose_database_password': 'weqpAHyz9C',
'firehose_database_type': 'postgresql',
'firehose_database_name': 'amon',
'firehose_heapsize': '268435456',
}
for group in mgmt.get_all_role_config_groups():
if group.roleType == 'ACTIVITYMONITOR':
group.update_config(am_config)
print ' Monitoring service roles created'
nn0 = {
'name': 'hdfs-nn',
'service': 'NAMENODE',
'host_id': h0['hostId'],
}
snn0 = {
'name': 'hdfs-snn',
'service': 'SECONDARYNAMENODE',
'host_id': h0['hostId'],
}
dn0 = {
'name': 'hdfs-dn0',
'service': 'DATANODE',
'host_id': h0['hostId'],
}
dn1 = {
'name': 'hdfs-dn1',
'service': 'DATANODE',
'host_id': h1['hostId'],
}
dn2 = {
'name': 'hdfs-dn2',
'service': 'DATANODE',
'host_id': h2['hostId'],
}
gate0 = {
'name': 'hdfs-nfs_gate',
'service': 'NFSGATEWAY',
'host_id': h0['hostId'],
}
hfs0 = {
'name': 'hdfs-httpfs',
'service': 'HTTPFS',
'host_id': h0['hostId'],
}
bal0 = {
'name': 'hdfs-bal',
'service': 'BALANCER',
'host_id': h0['hostId'],
}
try:
role_nn0 = hdfs.get_role(nn0['name'])
role_snn0 = hdfs.get_role(snn0['name'])
role_dn0 = hdfs.get_role(dn0['name'])
role_dn1 = hdfs.get_role(dn1['name'])
role_dn2 = hdfs.get_role(dn2['name'])
role_gate0 = hdfs.get_role(gate0['name'])
role_hfs0 = hdfs.get_role(hfs0['name'])
role_bal0 = hdfs.get_role(bal0['name'])
except:
role_nn0 = hdfs.create_role(nn0['name'], nn0['service'], nn0['host_id'])
role_snn0 = hdfs.create_role(snn0['name'], snn0['service'], snn0['host_id'])
role_dn0 = hdfs.create_role(dn0['name'], dn0['service'], dn0['host_id'])
role_dn1 = hdfs.create_role(dn1['name'], dn1['service'], dn1['host_id'])
role_dn2 = hdfs.create_role(dn2['name'], dn2['service'], dn2['host_id'])
role_gate0 = hdfs.create_role(gate0['name'], gate0['service'], gate0['host_id'])
role_hfs0 = hdfs.create_role(hfs0['name'], hfs0['service'], hfs0['host_id'])
role_bal0 = hdfs.create_role(bal0['name'], bal0['service'], bal0['host_id'])
print ' Hdfs service roles created'
rm0 = {
'name': 'yarn-rm',
'service': 'RESOURCEMANAGER',
'host_id': h0['hostId'],
}
nm0 = {
'name': 'yarn-nm0',
'service': 'NODEMANAGER',
'host_id': h0['hostId'],
}
nm1 = {
'name': 'yarn-nm1',
'service': 'NODEMANAGER',
'host_id': h1['hostId'],
}
nm2 = {
'name': 'yarn-nm2',
'service': 'NODEMANAGER',
'host_id': h2['hostId'],
}
hs0 = {
'name': 'yarn-hs',
'service': 'JOBHISTORY',
'host_id': h0['hostId'],
}
try:
role_rm0 = yarn.get_role(rm0['name'])
role_nm0 = yarn.get_role(nm0['name'])
role_nm1 = yarn.get_role(nm1['name'])
role_nm2 = yarn.get_role(nm2['name'])
role_hs0 = yarn.get_role(hs0['name'])
except:
role_rm0 = yarn.create_role(rm0['name'], rm0['service'], rm0['host_id'])
role_nm0 = yarn.create_role(nm0['name'], nm0['service'], nm0['host_id'])
role_nm1 = yarn.create_role(nm1['name'], nm1['service'], nm1['host_id'])
role_nm2 = yarn.create_role(nm2['name'], nm2['service'], nm2['host_id'])
role_hs0 = yarn.create_role(hs0['name'], hs0['service'], hs0['host_id'])
print ' Yarn service roles created'
print 'Service roles created'
print 'Updating role config'
hdfs_service_config = {
'dfs_replication': 2,
}
nn_config = {
'dfs_name_dir_list': '/data/dfs/nn',
'dfs_namenode_handler_count': 30,
}
snn_config = {
'fs_checkpoint_dir_list': '/data/dfs/snn',
}
dn_config = {
'dfs_data_dir_list': '/data/dfs/data',
'dfs_datanode_failed_volumes_tolerated': 0,
}
hdfs.update_config(svc_config=hdfs_service_config)
role_nn0.update_config(nn_config)
role_snn0.update_config(snn_config)
role_dn0.update_config(dn_config)
role_dn1.update_config(dn_config)
role_dn2.update_config(dn_config)
print ' Hdfs role configured'
yarn_service_config = {
'hdfs_service': 'hdfs',
}
rm_config = {
}
nm_config = {
'yarn_nodemanager_local_dirs': '/mapred/local',
}
gateway_config = {
'mapred_reduce_tasks': 10,
'mapred_submit_replication': 2,
}
yarn.update_config(yarn_service_config)
role_rm0.update_config(rm_config)
role_nm0.update_config(nm_config)
role_nm1.update_config(nm_config)
role_nm2.update_config(nm_config)
print ' Yarn roles configured'
raise Exception('boom')
sys.stdout.write("Stopping services...")
cluster.get_service('yarn').stop().wait()
cluster.get_service('hdfs').stop().wait()
print "done."
CMD_TIMEOUT = 300
if FORMAT_NAMENODE:
sys.stdout.write("Formatting namenode...")
cmd = hdfs.format_hdfs('hdfs-nn')[0]
if not cmd.wait(CMD_TIMEOUT).success:
raise Exception("Failed to format HDFS")
print "done."
else:
print "Skipping namenode formatting."
# START HDFS
sys.stdout.write("Starting hdfs...")
cmd = hdfs.start()
if not cmd.wait(CMD_TIMEOUT).success:
raise Exception("Failed to start HDFS")
print "done."
sys.stdout.write("Deploying yarn config...")
cmd = yarn.deploy_client_config('yarn-rm')
if not cmd.wait(CMD_TIMEOUT).success:
raise Exception("Failed to deploy YARN client config")
print "done."
# CREATE HDFS DIRS FOR YARN
cmds = '''hdfs dfs -mkdir -p /user/history/done
hdfs dfs -chown -R mapred:hadoop /user/history
hdfs dfs -mkdir /tmp
hdfs dfs -chown -R hdfs:hadoop /tmp
hdfs dfs -mkdir -p /tmp/logs
hdfs dfs -chown -R mapred:hadoop /tmp/logs'''.split('\n')
for cmd in cmds:
sys.stdout.write("Executing %s..." % cmd)
try:
subprocess.call(['su', '-', 'hdfs', '-c', cmd])
except:
raise Exception("Failed to call cmd %(cmd)s" % locals())
print "done."
# START YARN
sys.stdout.write("Starting yarn...")
cmd = yarn.start()
if not cmd.wait(CMD_TIMEOUT).success:
raise Exception("Failed to start YARN")
print "done."
print "Installation completed successfully. Run your hdfs and yarn smoke tests."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment