Last active
March 31, 2025 23:22
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import os | |
import sys | |
import subprocess | |
import json | |
import time | |
from collections import defaultdict | |
INT64_MAX = int(2 ** 63 - 1) | |
INT64_MIN = -INT64_MAX - 1 | |
def get_sstable_tokens(summary): | |
try: | |
first = summary['first_key']['token'] | |
last = summary['last_key']['token'] | |
except Exception as e: | |
first = INT64_MIN | |
last = INT64_MAX | |
return first, last | |
def get_run_id(scylla_metadata): | |
try: | |
return str(scylla_metadata['run_identifier']) | |
except Exception as e: | |
# generate random run id if none is found in an old Scylla.db | |
exit(1) | |
return str(uuid.uuid4()) | |
def get_run_size(run): | |
run_size = 0 | |
for sst in run[1]: | |
run_size += sst.size | |
return run_size | |
def get_sstable_origin(scylla_metadata): | |
try: | |
return str(scylla_metadata['sstable_origin']) | |
except Exception as e: | |
return "" | |
def get_data_stats(sstable_stats, gc_before): | |
partitions = 0 | |
elements = sstable_stats['estimated_partition_size'] | |
for element in elements: | |
partitions += element['value'] | |
rows = 0 | |
try: | |
rows = sstable_stats['rows_count'] | |
except Exception as e: | |
rows = 0 | |
min_timestamp = max_timestamp = max_local_deletion_time = 0 | |
try: | |
min_timestamp = sstable_stats['min_timestamp'] | |
max_timestamp = sstable_stats['max_timestamp'] | |
max_local_deletion_time = sstable_stats['max_local_deletion_time'] | |
except Exception as e: | |
min_timestamp = max_timestamp = max_local_deletion_time = 0 | |
tombstones = expired = 0 | |
try: | |
for key, value in sstable_stats['estimated_tombstone_drop_time'].items(): | |
tombstones += value | |
# tombstone is expired if its deletion_time < gc_before | |
if float(key) < gc_before: | |
expired += value | |
except Exception as e: | |
print("exception: {}".format(e)) | |
tombstones = expired = 0 | |
return partitions,rows,tombstones,expired,min_timestamp,max_timestamp,max_local_deletion_time | |
class inclusive_range: | |
def __init__(self, first, last): | |
self.first = first | |
self.last = last | |
def overlaps(self, other): | |
if not isinstance(other, inclusive_range): | |
sys.exit("other is not inclusive_range") | |
return self.first <= other.first <= self.last or other.first <= self.first <= other.last | |
def fmt(self): | |
return str("[{}, {}]".format(self.first, self.last)) | |
def sizeof_fmt(num, suffix='B'): | |
for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']: | |
if abs(num) < 1024.0: | |
return "%3.1f%s%s" % (num, unit, suffix) | |
num /= 1024.0 | |
return "%.1f%s%s" % (num, 'Yi', suffix) | |
def expired_tombstone_pctg(expired, tombstones): | |
if tombstones == 0: | |
return 0.0 | |
return expired / float(tombstones) * 100.00 | |
class sstable: | |
filename = "" | |
size = 0 | |
origin = "" | |
partitions = 0 | |
rows = 0 | |
tombstones = 0 | |
expired = 0 | |
min_ts = 0 | |
max_ts = 0 | |
mtime = 0 | |
first_token = 0 | |
last_token = 0 | |
max_ldt = 0 | |
fs = 0 | |
def __init__(self, f, s, origin, p, r, t, e, min_ts, max_ts, mtime, first_token, last_token, max_ldt, fs): | |
self.filename = f | |
self.size = s | |
self.partitions = int(p) | |
self.rows = int(r) | |
self.tombstones = int(t) | |
self.expired = int(e) | |
self.min_ts = int(min_ts) | |
self.max_ts = int(max_ts) | |
self.mtime = int(mtime) | |
self.first_token = int(first_token) | |
self.last_token = int(last_token) | |
self.max_ldt = int(max_ldt) | |
self.fs = fs | |
self.origin = origin | |
def mtime_desc(self): | |
staleness_before = time.time() - 86400 | |
if self.mtime < staleness_before: | |
return "stale" | |
return "not stale" | |
def max_ldt_desc(self): | |
if self.max_ldt != 2147483647: | |
return "all TTLd" | |
return "not all TTLd" | |
def origin_desc(self): | |
if self.origin == "": | |
return "" | |
return " (" + self.origin + ")" | |
def describe(self): | |
return "{ %s%s: size: %s, ps: %d, rows: %d, tomb: %d (expired: %d, %.2f%%), bf: %s, ts: [%d, %d], token: [%d, %d], mtime: %d (%s), max_ldt: %d (%s)" \ | |
% (self.filename, self.origin_desc(), sizeof_fmt(self.size), self.partitions, self.rows, self.tombstones, self.expired, expired_tombstone_pctg(self.expired, self.tombstones), \ | |
sizeof_fmt(self.fs), self.min_ts, self.max_ts, self.first_token, self.last_token, self.mtime, self.mtime_desc(), self.max_ldt, self.max_ldt_desc()) | |
class per_shard_info: | |
shard_id = 0 | |
runs_to_sstables = [] | |
size = 0 | |
partitions = 0 | |
rows = 0 | |
tombstones = 0 | |
expired = 0 | |
sstables = 0 | |
def __init__(self, shard_id): | |
self.shard_id = shard_id | |
self.runs_to_sstables = defaultdict(set) | |
def add_sstable(self, run_id, sst): | |
self.runs_to_sstables[run_id].add(sst) | |
def dump(self, timestamp_overlaps): | |
runs_to_timestamp_range = defaultdict(set) | |
print("--- SHARD #{} ---".format(self.shard_id)) | |
for run_id,sstables in sorted(self.runs_to_sstables.items(), key=get_run_size, reverse=True): | |
run_size = 0 | |
run_partitions = 0 | |
run_rows = 0 | |
run_tombstones = 0 | |
run_expired = 0 | |
run_min_ts = INT64_MAX | |
run_max_ts = INT64_MIN | |
run_first_token = INT64_MAX | |
run_last_token = INT64_MIN | |
sst_descriptions = "" | |
for sst in sstables: | |
self.sstables += 1 | |
run_size += sst.size | |
run_partitions += sst.partitions | |
run_rows += sst.rows | |
run_tombstones += sst.tombstones | |
run_expired += sst.expired | |
sst_descriptions += "\n\t" + sst.describe() | |
if sst.min_ts < run_min_ts: | |
run_min_ts = sst.min_ts | |
if sst.max_ts > run_max_ts: | |
run_max_ts = sst.max_ts | |
if sst.first_token < run_first_token: | |
run_first_token = sst.first_token | |
if sst.last_token > run_last_token: | |
run_last_token = sst.last_token | |
current_run_ts_range = inclusive_range(run_min_ts, run_max_ts) | |
overlap_descriptions = "" | |
if timestamp_overlaps: | |
for run_id_x,range_x in runs_to_timestamp_range.items(): | |
if current_run_ts_range.overlaps(range_x): | |
overlap_descriptions += "\n\t{{ overlaps in timestamp with run {} }}".format(run_id_x) | |
runs_to_timestamp_range[run_id] = current_run_ts_range | |
print("[Run %s: size: %s, partitions: %d rows: %d, tombstones: %d (expired: %d, %.2f%%), timestamp: [%d, %d], token: [%d, %d] %s%s\n]" \ | |
% (run_id, sizeof_fmt(run_size), run_partitions, run_rows, run_tombstones, run_expired, expired_tombstone_pctg(run_expired, run_tombstones), run_min_ts, run_max_ts, run_first_token, run_last_token, sst_descriptions, overlap_descriptions)) | |
self.size += run_size | |
self.partitions += run_partitions | |
self.rows += run_rows | |
self.tombstones += run_tombstones | |
self.expired += run_expired | |
def summary(self): | |
estimated_droppable_pctg = 0 | |
if self.rows > 0: | |
estimated_droppable_pctg = float(self.expired) / float(self.rows) * 100.0 | |
print("--- SHARD #{} ---".format(self.shard_id)) | |
print("size: %s, sstables: %d, partitions: %d rows: %d, tombstones: %d (expired: %d, %.2f%%), estimated droppable pctg: %.2f%%%%" % (sizeof_fmt(self.size), self.sstables, self.partitions, self.rows, self.tombstones, self.expired, expired_tombstone_pctg(self.expired, self.tombstones), estimated_droppable_pctg)) | |
def load_json(output): | |
try: | |
return json.loads(output) | |
except: | |
return json.loads(output.decode("utf-8", errors="ignore")) | |
def main(): | |
cmdline_parser = argparse.ArgumentParser() | |
cmdline_parser.add_argument('table_dir', help='path to table dir') | |
cmdline_parser.add_argument('shards', help='number of shards (cpus) used in this node') | |
cmdline_parser.add_argument('--gc_grace_seconds', type=int, default=3600*24*7, help='gc grace seconds') | |
cmdline_parser.add_argument('--ts_overlaps', type=bool, default=False, help='shows whether a run overlaps with another in timestamp range') | |
cmdline_parser.add_argument('--scylla', default='/usr/bin/scylla', help='path to scylla binary') | |
args = cmdline_parser.parse_args() | |
scylla_bin_path=args.scylla | |
directory=args.table_dir | |
shards=args.shards | |
if not os.path.exists(scylla_bin_path): | |
print("unable to find scylla binary in {}. specify correct one with --scylla option".format(scylla_bin_path)) | |
exit(1) | |
gc_grace_seconds = args.gc_grace_seconds | |
gc_before = time.time() - gc_grace_seconds | |
timestamp_overlaps = args.ts_overlaps | |
print("scylla path: {}".format(scylla_bin_path)) | |
print("table dir: {}".format(directory)) | |
print("gc_grace_seconds = {}, gc_before = {}".format(gc_grace_seconds, gc_before)) | |
per_shard_info_set = dict() | |
for shard_id in range(0, int(shards)): | |
per_shard_info_set[shard_id] = per_shard_info(shard_id) | |
for filename in os.listdir(directory): | |
if not filename.endswith("Scylla.db"): | |
continue | |
scylla_file = os.path.join(directory, filename) | |
data_file = scylla_file.replace("Scylla.db", "Data.db") | |
if not os.path.exists(data_file): | |
continue | |
size = os.stat(data_file).st_size | |
mtime = os.stat(data_file).st_mtime | |
filter_file = scylla_file.replace("Scylla.db", "Filter.db") | |
filter_s = os.stat(filter_file).st_size | |
try: | |
result = subprocess.run([scylla_bin_path, 'sstable', 'shard-of', scylla_file, "--vnodes", "--shards", shards], stdout=subprocess.PIPE) | |
except: | |
# older versions don't know --vnodes. | |
result = subprocess.run([scylla_bin_path, 'sstable', 'shard-of', scylla_file, "--shards", shards], stdout=subprocess.PIPE) | |
data = load_json(result.stdout) | |
sstables = data['sstables'] | |
assert len(sstables.items()) == 1 | |
[_, value] = list(sstables.items())[0] | |
sstable_shard = value[0] | |
result = subprocess.run([scylla_bin_path, 'sstable', 'dump-statistics', scylla_file], stdout=subprocess.PIPE) | |
data = load_json(result.stdout) | |
[_, stats] = list(data['sstables'].items())[0] | |
sstable_stats = stats['stats'] | |
partitions,rows,tombstones,expired,min_ts,max_ts, max_ldt = get_data_stats(sstable_stats, gc_before) | |
result = subprocess.run([scylla_bin_path, 'sstable', 'dump-summary', scylla_file], stdout=subprocess.PIPE) | |
data = load_json(result.stdout) | |
[_, summary] = list(data['sstables'].items())[0] | |
first_token, last_token = get_sstable_tokens(summary) | |
result = subprocess.run([scylla_bin_path, 'sstable', 'dump-scylla-metadata', scylla_file], stdout=subprocess.PIPE) | |
data = load_json(result.stdout) | |
[_, scylla_metadata] = list(data['sstables'].items())[0] | |
sstable_origin = get_sstable_origin(scylla_metadata) | |
run_id = get_run_id(scylla_metadata) | |
sst = sstable(filename, size, sstable_origin, partitions, rows, tombstones, expired, min_ts, max_ts, mtime, first_token, last_token, max_ldt, filter_s) | |
per_shard_info_set[sstable_shard].add_sstable(run_id, sst) | |
#print("{} ({}): shard: {}, ps: {}, rows: {}, tomb: {} (expired: {}), ts: [{}, {}], token: [{}, {}], max_ldt: {}".format(filename, sstable_origin, sstable_shard, partitions, rows, tombstones, expired, min_ts, max_ts, first_token, last_token, max_ldt)) | |
for shard_id in range(0, int(shards)): | |
per_shard_info_set[shard_id].dump(timestamp_overlaps) | |
print("NOTE: please take 'estimated droppable pctg' with a grain of salt. It's an estimation which results from dividing # of expired data by # of rows.") | |
for shard_id in range(0, int(shards)): | |
per_shard_info_set[shard_id].summary() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment