Last active
October 12, 2017 23:04
-
-
Save onefoursix/5c482aab501472983d52 to your computer and use it in GitHub Desktop.
Example of how to retrieve BDR history using the Cloudera Manager API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
## ******************************************************************************** | |
## get-bdr-history.py | |
## | |
## Example of how to retrieve BDR command history using the Cloudera Manager API | |
## | |
## Usage: ./get-bdr-history.py <limit> | |
## | |
## <limit> is the maximum number of replication commands to retrieve | |
## history for per scheduled replcation; the default is 20 | |
## | |
## for example: ./get-bdr-history.py 100 | |
## | |
## The rest of the values are set in the script below; edit the settings | |
## below to connect to your Backup Cluster | |
## | |
## The script assumes one HDFS and one Hive Service exist on the Backup Cluster | |
## | |
## ******************************************************************************** | |
## ** imports ******************************* | |
import sys | |
import time | |
from cm_api.api_client import ApiResource | |
## ** Settings to connect to Backup cluster ** | |
cm_host = "BACKUP_CLUSTER_CM_HOST" | |
cm_port = "7180" | |
cm_login = "BACKUP_CLUSTER_CM_LOGIN" | |
cm_password = "BACKUP_CLUSTER_CM_PASSWORD" | |
backup_cluster_name = "BACKUP_CLUSTER_NAME" | |
## ** Get command line args ****************** | |
if len(sys.argv) == 1: | |
limit = 20 | |
else: | |
if len(sys.argv) == 2 and sys.argv[1].isdigit(): | |
limit = sys.argv[1] | |
else: | |
print "Usage: ./get-bdr-history.py <limit>" | |
quit(1) | |
## Used for formatting dates | |
fmt = '%Y-%m-%d %H:%M:%S %Z' | |
print "\n\nCloudera BDR Replication History for Hive and HDFS -- " + time.strftime(fmt) | |
## Connect to CM | |
print "\nConnecting to Cloudera Manager at " + cm_host + ":" + cm_port | |
api = ApiResource(server_host=cm_host, server_port=cm_port, username=cm_login, password=cm_password) | |
## Get Backup Cluster | |
backup_cluster = None | |
clusters = api.get_all_clusters() | |
for cluster in clusters: | |
if cluster.displayName == backup_cluster_name: | |
backup_cluster = cluster | |
break | |
if backup_cluster is None: | |
print "\nError: Cluster '" + backup_cluster_name + "' not found" | |
quit(1) | |
## Get HDFS Service | |
hdfs_service = None | |
service_list = backup_cluster.get_all_services() | |
for service in service_list: | |
if service.type == "HDFS": | |
hdfs_service = service | |
break | |
if hdfs_service is None: | |
print "Error: Could not locate HDFS Service" | |
quit(1) | |
## Get Hive Service | |
hive_service = None | |
service_list = backup_cluster.get_all_services() | |
for service in service_list: | |
if service.type == "HIVE": | |
hive_service = service | |
break | |
if hive_service is None: | |
print "Error: Could not locate Hive Service" | |
quit(1) | |
## Get Hive replication schedules | |
schedules = hive_service.get_replication_schedules() | |
## Iterate through the Hive replication schedules | |
for schedule in schedules: | |
print "\n\n******************************************************************************" | |
print "** Hive Replication Schedule (ID = " + str(schedule.id) + ") ****************************************" | |
print "******************************************************************************" | |
## Get the Hive Replication Arguments in order to get Hive-specific replication schedule settings | |
hive_args = schedule.hiveArguments | |
## Will be true if the Hive replication is configured to also copy data from HDFS | |
replicate_data = hive_args.replicateData | |
## If applicable, get the HDFS Replication Arguments for the Hive job | |
if replicate_data: | |
hdfs_args = hive_args.hdfsArguments | |
## Print details of the replication schedule | |
print "Start Time: " + schedule.startTime.strftime(fmt) | |
if schedule.interval is not None and schedule.nextRun is not None: | |
print "Recurrance: " + str(schedule.interval) + " " + schedule.intervalUnit | |
print "Next Scheduled: " + schedule.nextRun.strftime(fmt) | |
print "Source Cluster: " + hive_args.sourceService.peerName | |
print "Destination Cluster: " + backup_cluster_name | |
## Print list of selected tables in the replication schedule | |
tableFilters = hive_args.tableFilters | |
if tableFilters is None: | |
print "\nTables Specified for Replication: All Tables in All Databases" | |
else: | |
for table in tableFilters: | |
print "\nTables Specified for Replication:" | |
print " " + table.database + "." + table.tableName | |
## Get the history of replication commands for the Hive replication schedule | |
command_history = hive_service.get_replication_command_history(schedule_id=schedule.id, limit=limit, view='full') | |
print "\nReplications:" | |
## print some details for each command | |
for command in command_history: | |
hive_result = command.hiveResult | |
print "\n** Hive Replication (Command ID = " + str(command.id) + ") " | |
print "Start Time: " + command.startTime.strftime(fmt) | |
if command.active: | |
print "Command is still running" | |
break | |
print "End Time: " + command.endTime.strftime(fmt) | |
if not command.success: | |
print "Result Status: Failed" | |
else: | |
print "Result Status: Success" | |
## Get tables that were replicated | |
print " Tables Replicated:" | |
tables = hive_result.tables | |
for table in tables: | |
print " " + table.database + "." + table.tableName | |
## Get the tables' HDFS Data replication details | |
## I'll print a few attributes here, There are many more attributes in the class Class ApiHdfsReplicationResult | |
if replicate_data: | |
hdfs_result = hive_result.dataReplicationResult | |
print " Table data -- HDFS replication" | |
print " JobId: " + hdfs_result.jobId | |
print " Number of Files Expected: " + str(hdfs_result.numFilesExpected) | |
print " Number of Files Copied: " + str(hdfs_result.numFilesCopied) | |
print " Number of Files Skipped: " + str(hdfs_result.numFilesSkipped) | |
print " Number of Files Copy Failed: " + str(hdfs_result.numFilesCopyFailed) | |
## ## Get top level HDFS replication schedules | |
schedules = hdfs_service.get_replication_schedules() | |
## Iterate through all HDFS replication schedules | |
for schedule in schedules: | |
## Get the HDFS Arguments | |
hdfs_args = schedule.hdfsArguments | |
print "\n\n******************************************************************************" | |
print "** HDFS Replication Schedule (ID = " + str(schedule.id) + ") ****************************************" | |
print "******************************************************************************" | |
## Print details of the replication schedule | |
print "Start Time: " + schedule.startTime.strftime(fmt) | |
if schedule.interval is not None and schedule.nextRun is not None: | |
print "Recurrance: " + str(schedule.interval) + " " + schedule.intervalUnit | |
print "Next Scheduled: " + schedule.nextRun.strftime(fmt) | |
print "Source Cluster: " + hdfs_args.sourceService.peerName | |
print "Destination Cluster: " + backup_cluster_name | |
print " Source Path: " + hdfs_args.sourcePath | |
print " Destination Path: " + hdfs_args.destinationPath | |
print "\nReplications:" | |
## Get the history of commands for the scheduled HDFS replication | |
command_history = hdfs_service.get_replication_command_history(schedule_id=schedule.id, limit=limit, view='full') | |
for command in command_history: | |
hdfs_result = command.hdfsResult | |
print "\n** HDFS Replication (Command ID = " + str(command.id) + ") " | |
print "Start Time: " + command.startTime.strftime(fmt) | |
if command.active: | |
print "Command is still running" | |
break | |
print "End Time: " + command.endTime.strftime(fmt) | |
if not command.success: | |
print "Result Status: Failed" | |
else: | |
print "Result Status: Success" | |
## I'll print a few attributes here, There are many more attributes in the class Class ApiHdfsReplicationResult | |
print " JobId: " + hdfs_result.jobId | |
print " Source Path: " + hdfs_args.sourcePath | |
print " Destination Path: " + hdfs_args.destinationPath | |
print " Number of Files Expected: " + str(hdfs_result.numFilesExpected) | |
print " Number of Files Copied: " + str(hdfs_result.numFilesCopied) | |
print " Number of Files Skipped: " + str(hdfs_result.numFilesSkipped) | |
print " Number of Files Copy Failed: " + str(hdfs_result.numFilesCopyFailed) | |
print "\n\n\n** End of Report **" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment