Last active
February 22, 2019 16:04
-
-
Save ibspoof/40edc3d7dc0b73e5c108a3728df7e10c to your computer and use it in GitHub Desktop.
Restore a single nodes SSTables from OpsCenters S3 Backup Location using multi-threaded downloads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[s3] | |
#s3 bucket name | |
bucket_name = my_backups | |
download_threads = 6 | |
# other s3 access is defined in the default aws cli settings file | |
[opscenter] | |
backup_job_uuid = # get this from s3 bucket | |
[node] | |
host_id = # get this from nodetool status | |
restore_dir = ./restore/ | |
# should be the /var/lib/cassandra/data or similar parent dir | |
cassandra_data_dir = ./cassandra/ | |
invalid_keyspaces = OpsCenter, system, system_traces |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import subprocess | |
import ConfigParser | |
import time | |
import logging | |
import shutil | |
from glob import glob | |
import re | |
from multiprocessing.pool import ThreadPool | |
# enable logging to console and output.log file | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') | |
file_out = logging.FileHandler("./output.log") | |
file_out.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) | |
file_out.setLevel(logging.DEBUG) | |
logging.getLogger().addHandler(file_out) | |
Config = ConfigParser.ConfigParser() | |
Config.read("./restore_node_from_opscenter_backups.ini") | |
conf = { | |
's3': { | |
'bucket_name': "s3://" + Config.get('s3', 'bucket_name'), | |
'download_threads': Config.getint('s3', 'download_threads') | |
}, | |
'opscenter': { | |
'backup_job_uuid': Config.get('opscenter', 'backup_job_uuid') | |
}, | |
'node': { | |
'host_id': Config.get('node', 'host_id'), | |
'restore_dir': Config.get('node', 'restore_dir'), | |
'cassandra_data_dir': Config.get('node', 'cassandra_data_dir'), | |
'invalid_keyspaces': Config.items('node', 'invalid_keyspaces') | |
} | |
} | |
def aws_cmd(*args): | |
cmd_list = ['aws', 's3'] | |
for a in args: | |
cmd_list.append(a) | |
logging.debug("Command run: %s", " ".join(cmd_list)) | |
return subprocess.check_output(cmd_list, shell=False, stderr=subprocess.STDOUT) | |
def bucket_path(trailing=False, *args): | |
S3_PATH_SEP = "/" | |
path = S3_PATH_SEP.join(args) | |
if trailing: | |
path = path + S3_PATH_SEP | |
return path | |
def get_host_ids_s3_snapshot_path(trailing=True): | |
return bucket_path(trailing, conf['s3']['bucket_name'], 'snapshots', conf['node']['host_id']) | |
def get_latest_backup_json(): | |
backups = aws_cmd('ls', get_host_ids_s3_snapshot_path()) | |
backup_list_clean = [] | |
for backup in backups.split("\n"): | |
if backup.find(conf['opscenter']['backup_job_uuid']) > 0: | |
backup_list_clean.append(backup.strip(' \rPRE/\n')) | |
last_backup = backup_list_clean[-1] | |
logging.info("Last backup is: %s", last_backup) | |
local_backup_file = conf['node']['restore_dir'] + 'backup.json' | |
remote_backup_file = bucket_path(False, get_host_ids_s3_snapshot_path(False), last_backup, 'backup.json') | |
aws_cmd('cp', remote_backup_file, local_backup_file, '--profile', 'cassandra') | |
return json.load(open(local_backup_file)) | |
def get_keyspace_tables_to_restore(backup_json): | |
keyspace_sstables = {} | |
for keyspace in backup_json['keyspaces']: | |
if keyspace in conf['node']['invalid_keyspaces']: | |
continue | |
for table in backup_json['keyspaces'][keyspace]: | |
if keyspace not in keyspace_sstables: | |
keyspace_sstables[keyspace] = {} | |
sstable_list = [] | |
for sstable in backup_json['sstables']: | |
if sstable['type'] == "Data": | |
if sstable['keyspace'] == keyspace and sstable['cf'] == table: | |
sstable_split = sstable['name'].split("-") | |
sstable_split.pop() | |
sstable_list.append("-".join(sstable_split) + '-') | |
if len(sstable_list) > 0: | |
keyspace_sstables[keyspace][table] = sstable_list | |
return keyspace_sstables | |
def create_dir(path): | |
""" | |
Create restore path | |
:param path: | |
:return: | |
""" | |
if not os.path.exists(path): | |
os.makedirs(path) | |
def download_all_sstables_from_s3(keyspace_tables_to_restore): | |
""" | |
Downloads all the files to restore directory | |
:param tables_to_restore: | |
:return: | |
""" | |
base_sstable_s3_dir = bucket_path(True, get_host_ids_s3_snapshot_path(False), 'sstables') | |
sstable_files = ['CompressionInfo.db', 'Data.db', 'Filter.db', 'Index.db', 'Statistics.db', 'Summary.db'] | |
all_downloads_start = time.time() | |
for keyspace in keyspace_tables_to_restore: | |
pool = ThreadPool(processes=conf['s3']['download_threads']) | |
keyspace_files_to_get = [] | |
if len(keyspace_tables_to_restore[keyspace]) < 1: | |
logging.debug("Skipping keyspace %s since there are no tables to restore", keyspace) | |
continue | |
keyspace_dir = conf['node']['restore_dir'] + keyspace | |
create_dir(keyspace_dir) | |
for table in keyspace_tables_to_restore[keyspace]: | |
table_dir = keyspace_dir + "/" + table + "/" | |
create_dir(table_dir) | |
table_time_start = time.time() | |
if len(keyspace_tables_to_restore[keyspace][table]) < 1: | |
logging.debug("No SSTables found for %s:%s skipping download.", keyspace, table) | |
continue | |
for sstable in keyspace_tables_to_restore[keyspace][table]: | |
for type in sstable_files: | |
sstable_copy = sstable + type | |
keyspace_files_to_get.append((base_sstable_s3_dir + sstable_copy, table_dir)) | |
keyspace_time_start = time.time() | |
logging.info("Downloading %d files for keyspace %s using %d threads", len(keyspace_files_to_get), | |
keyspace, conf['s3']['download_threads']) | |
pool.map(download, keyspace_files_to_get) | |
logging.info("Completed downloading all SSTables for keyspace %s in %.2fs", keyspace, | |
(time.time() - keyspace_time_start)) | |
pool.close() | |
pool.join() | |
logging.info("Complete downloading all backed up SSTables in %.2fs", time.time() - all_downloads_start) | |
def download(to_download): | |
logging.debug("Downloading %s to: %s", to_download[0], to_download[1]) | |
aws_cmd('cp', to_download[0], to_download[1], '--profile', 'cassandra') | |
def get_restore_table_paths_mapped_to_casssandra(cassandra_keyspace_dir): | |
cassandra_table_dirs_with_uuid = glob(cassandra_keyspace_dir + "/*") | |
table_to_cassandra_path = {} | |
for path in cassandra_table_dirs_with_uuid: | |
new_path = re.sub("\\\\", "/", path).split("/") | |
table_name = new_path[-1].split("-")[0] | |
table_to_cassandra_path[table_name] = new_path[-1] | |
return table_to_cassandra_path | |
def move_downloaded_sstables_to_cassandra_location(keyspace_tables_to_restore): | |
moving_start_time = time.time() | |
cassandra_base_dir = conf['node']['cassandra_data_dir'] | |
logging.info("Moving downloaded files to Cassandra dir %s", cassandra_base_dir) | |
sstable_files = ['CompressionInfo.db', 'Data.db', 'Filter.db', 'Index.db', 'Statistics.db', 'Summary.db'] | |
for keyspace in keyspace_tables_to_restore: | |
if len(keyspace_tables_to_restore[keyspace]) < 1: | |
logging.debug("Skipping keyspace %s since there are no tables to restore", keyspace) | |
continue | |
keyspace_dir = conf['node']['restore_dir'] + keyspace | |
cassandra_keyspace_dir = cassandra_base_dir + keyspace + "/" | |
path_mapping = get_restore_table_paths_mapped_to_casssandra(cassandra_keyspace_dir) | |
for table in keyspace_tables_to_restore[keyspace]: | |
table_dir = keyspace_dir + "/" + table + "/" | |
cassandra_table_path = cassandra_keyspace_dir + path_mapping[table] + "/" | |
for sstable in keyspace_tables_to_restore[keyspace][table]: | |
sstable_split = sstable.split("-") | |
sstable_split.pop(0) | |
cassandra_filename_prefix = "-".join(sstable_split) | |
for type in sstable_files: | |
downloaded_sstable_filename = sstable + type | |
downloaded_sstable_path = table_dir + downloaded_sstable_filename | |
cassandra_filename = cassandra_filename_prefix + type | |
cassandra_sstable_path = cassandra_table_path + cassandra_filename | |
logging.debug("Copying downloaded file from %s to %s", downloaded_sstable_path, | |
cassandra_sstable_path) | |
shutil.copy(downloaded_sstable_path, cassandra_sstable_path) | |
logging.info("Complete moving SSTables to Cassandra dir in %.2fs", time.time() - moving_start_time) | |
backup_json = get_latest_backup_json() | |
keyspace_tables_to_restore = get_keyspace_tables_to_restore(backup_json) | |
download_all_sstables_from_s3(keyspace_tables_to_restore) | |
move_downloaded_sstables_to_cassandra_location(keyspace_tables_to_restore) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment