Last active
December 23, 2020 02:01
-
-
Save glennklockwood/755131fd6939f399232959005d94a98d to your computer and use it in GitHub Desktop.
File system fullness watcher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Creates JSON representation of file system fullness | |
""" | |
import re | |
import time | |
import json | |
import subprocess | |
WATCHED_FS = re.compile(r'(^(dev)?tmpfs|:/cvmfs(/|\s)|/dev/|overlay)') | |
INTEGER_COLS = {'1K-blocks', 'Used', 'Available', 'Inodes', 'IUsed', 'IFree'} | |
DELETE_COLS = {'Use%', "IUse%"} | |
def parse_df(output_str): | |
result = {} | |
columns = None | |
for line in [x.decode() for x in output_str.splitlines()]: | |
# First line always defines column names | |
if columns is None: | |
columns = line.split() | |
continue | |
# Filter out file systems uninteresting file systems | |
match = WATCHED_FS.search(line) | |
if match: | |
continue | |
# create list of [(colname0, colval0), (colname1, colval1), ...] | |
keys_vals = list(zip(columns, line.split())) | |
# first column is always the file system name - pop this off so we can | |
# key df values by fsname | |
_, fsname = keys_vals.pop(0) | |
# transform list of tuples into a dictionary | |
result_dict = dict(keys_vals) | |
# convert certain columns into integers | |
for integer_col in INTEGER_COLS: | |
if integer_col in result_dict: | |
result_dict[integer_col] = int(result_dict[integer_col]) | |
for delete_col in DELETE_COLS: | |
if delete_col in result_dict: | |
del result_dict[delete_col] | |
# build a dict, keyed by fsname, containing fs fullness | |
result[fsname] = result_dict | |
return result | |
def main(argv=None): | |
result = { | |
'_timestamp': int(time.time()) # high precision timestamp is unnecessary and misleading | |
} | |
for df_opt in ['-k', '-i']: | |
for key, value in parse_df(subprocess.check_output(['df', df_opt])).items(): | |
if key not in result: | |
result[key] = {} | |
result[key].update(value) | |
return result | |
if __name__ == "__main__": | |
print(json.dumps(main())) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Estimates file system fill rate and date of fullness | |
""" | |
import json | |
import datetime | |
import dask | |
import dask.bag | |
import scipy.stats | |
import tokio | |
try: | |
import abcutils | |
import matplotlib | |
import matplotlib.pyplot | |
except ImportError: | |
pass | |
def parse_line(line, file_system): | |
"""Function to parse each line of the df output file | |
Args: | |
line (str): A single json-encoded line containing df data. This must | |
match the format created by the df.py script running in crontab. | |
file_system (str): Name of file system to extract from df file | |
Returns: | |
dict: Contains three keys: | |
- timestamp (int): timestamp of measurement | |
- used (int): capacity used in kibibytes | |
- available (int): capacity remaining in kibibytes | |
""" | |
record = json.loads(line) | |
if file_system not in record: | |
return {} | |
return { | |
'timestamp': record['_timestamp'], | |
'used': record[file_system]['Used'], | |
'available': record[file_system]['Available'], | |
} | |
def plot_capacity(plot_df, ax=None): | |
"""Plots capacity usage over time for a DataFrame | |
Args: | |
plot_df (pandas.DataFrame): DataFrame of format returned by | |
load_df_files() | |
ax (matplotlib.pyplot.axes.Axes or None): Axes on which plot should be | |
drawn. If none, create a new axis. | |
Returns: | |
matplotlib.pyplot.axes.Axes: Axes on which plot was drawn | |
""" | |
if ax is None: | |
_, ax = matplotlib.pyplot.subplots() | |
xval = plot_df['timestamp'].values | |
yval = plot_df['used'].values * 1024 / 10**15 | |
fs_size_pb = (plot_df['used'] + plot_df['available']).values[-1] * 1024 / 10**15 | |
ax.plot(xval, yval, linewidth=2) | |
ax.set_ylabel("Used Capacity (PB)") | |
ax.set_xlabel("") | |
ax.grid() | |
ax.set_axisbelow(True) | |
delta_days = (xval[-1] - xval[0]) / 86400 | |
if delta_days <= 7: | |
criteria = lambda x: True | |
elif delta_days <= 28: | |
criteria = lambda x: (x.toordinal() % 2) == 0 | |
else: | |
criteria = lambda x: x.weekday() == 0 | |
abcutils.plot.fix_xticks_timeseries(ax=ax, format="%a %b %-d", criteria=criteria) | |
def abs2rel(cap): | |
return cap / fs_size_pb * 100.0 | |
def rel2abs(cap): | |
return cap * fs_size_pb* 100.0 | |
secax = ax.secondary_yaxis('right', functions=(abs2rel, rel2abs)) | |
secax.set_ylabel("Used Capacity (%)") | |
return ax | |
def plot_fit(plot_df, fit, ax, **kwargs): | |
"""Plots a line of best fit over a dataframe | |
""" | |
xval = plot_df['timestamp'].values | |
ax.plot( | |
xval, | |
(fit.slope * xval + fit.intercept), | |
**kwargs) | |
return ax | |
def load_df_files(start, end, template, file_system): | |
"""Loads df data into a dataframe using Dask | |
Args: | |
start (datetime.datetime): Load data starting on or after this date | |
end (datetime.datetime): Do not load data after this date | |
template (str): Path to pass to strftime that resolves to one or more | |
input files corresponding to the start and end dates give. | |
file_system (str): File system to load from the df files | |
Returns: | |
pandas.DataFrame: Contains the following columns: | |
- timestamp (int): Seconds since epoch corresponding to a single | |
fullness measurement | |
- used (int): Capacity used expressed in units of kibibytes | |
- available (int): Remaining capacity available, expressed as kibibytes | |
Its index is named ``datetime`` and is a timestamp index. | |
""" | |
df_files = tokio.tools.common.enumerate_dated_files( | |
start=start, | |
end=end, | |
template=template, | |
use_glob=True) | |
bag = dask.bag.read_text(df_files).map(parse_line, file_system) | |
dataframe = bag.to_dataframe(meta={'timestamp': int, 'used': int, 'available': int}) | |
dataframe['datetime'] = dataframe['timestamp'].map(datetime.datetime.fromtimestamp) | |
return dataframe.set_index('datetime').compute() | |
def fit_interval(dataframe, start=None, end=None): | |
"""Calculate linear fit from a subset of a DataFrame generated by load_df_files | |
Args: | |
dataframe (pandas.DataFrame): DataFrame of format returned by load_df_files() | |
start (datetime.datetime or None): Fit to data starting on or after this | |
date. If None, do not apply this filter. | |
end (datetime.datetime or None): Do not fit to data data after this | |
date. If None, do not apply this filter. | |
Returns: | |
Same as scipy.stats.linregress() which includes the following attributes: | |
- slope (float): expressed in units of base-10 petabytes per second | |
- intercept (float): expressed as base-10 petabytes | |
- rvalue (float): correlation coefficient | |
""" | |
xval_fit = dataframe['timestamp'].values | |
yval_fit = dataframe['used'] * 1024 / 10**15 | |
filt = [True] * dataframe.index.shape[0] | |
if start: | |
filt &= dataframe.index >= start | |
if end: | |
filt &= dataframe.index < end | |
xval_fit = dataframe.loc[filt]['timestamp'].values | |
yval_fit = dataframe.loc[filt]['used'] * 1024 / 10**15 | |
return scipy.stats.linregress(xval_fit, yval_fit) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import argparse | |
import datetime | |
import matplotlib | |
matplotlib.use("Agg") | |
from . import fullness | |
DATE_FMT_PRINT = "YY-MM-DD" | |
DATE_FMT = "%Y-%m-%d" | |
KIB_TO_PB = 1024.0 / 10**15 | |
FIT_INTERVALS = [ | |
(None, None), | |
(datetime.datetime(2020, 3, 1), datetime.datetime(2020, 3, 27)), | |
(datetime.datetime(2020, 4, 7), None), | |
(datetime.datetime.today() - datetime.timedelta(days=7), None), | |
(datetime.datetime.today() - datetime.timedelta(days=14), None), | |
] | |
def main(argv=None): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--start', type=str, default=None, | |
help='first timestamp (inclusive) when creating new output file,' + | |
' in %s format (default: 7 days ago)' % DATE_FMT_PRINT) | |
parser.add_argument('--end', type=str, default=None, | |
help='final timestamp (exclusive) when creating new output file,' + | |
' in %s format (default: today)' % DATE_FMT_PRINT) | |
parser.add_argument("--fill-pct", type=float, default=80, | |
help="Estimate date to reach this percent fullness (default: 80)") | |
parser.add_argument("-", "--plot-output", type=str, default=None, | |
help="Location to produce graphical plot (default None)") | |
parser.add_argument("fs", type=str, default=None, | |
help="Name of file system to load from input file(s)") | |
parser.add_argument( | |
"--template", | |
type=str, | |
default=os.path.join(os.environ.get("PYTOKIO_DAILY_DATA_DIR", os.getcwd()), DATE_FMT, "df_*.txt"), | |
help="Template to pass to strftime to identify input files (default: $PYTOKIO_DAILY_DATA_DIR/%s/df_*.txt" % DATE_FMT.replace("%", "%%")) | |
args = parser.parse_args(argv) | |
try: | |
if args.end: | |
end = datetime.datetime.strptime(args.end, DATE_FMT) | |
else: | |
end = datetime.datetime.today() | |
if args.start: | |
start = datetime.datetime.strptime(args.start, DATE_FMT) | |
else: | |
start = end - datetime.timedelta(days=7) | |
except ValueError: | |
sys.stderr.write("Start and end times must be in format %s\n" % DATE_FMT_PRINT) | |
raise | |
# print("Loading from %s" % args.template) | |
dataframe = cfsalib.fullness.load_df_files( | |
start=start, | |
end=end, | |
template=args.template, | |
file_system=args.fs) | |
fit = cfsalib.fullness.fit_interval(dataframe, start, end) | |
# print("%s to %s: %.2f TB/day (R=%.4f)" % ( | |
# start.strftime("%Y-%m-%d") if start else datetime.datetime.fromtimestamp(dataframe.iloc[0]['timestamp']).strftime("%Y-%m-%d"), | |
# end.strftime("%Y-%m-%d") if end else "today", | |
# fit.slope * 1000 * 86400, | |
# fit.rvalue)) | |
slope_pbs_per_sec = fit.slope | |
intercept_pbs = fit.intercept | |
# yshift = difference between measured and predicted at end of measured timeseries | |
yshift = dataframe['used'].iloc[-1] * KIB_TO_PB - (dataframe['timestamp'].iloc[-1] * slope_pbs_per_sec + intercept_pbs) | |
max_pb = (dataframe['used'] + dataframe['available']).values[0] * KIB_TO_PB | |
most_pb = max_pb / 100.0 * args.fill_pct | |
ax = None | |
if args.plot_output: | |
ax = cfsalib.fullness.plot_capacity(dataframe) | |
for full_capacity in (most_pb, args.fill_pct),:# (max_pb, 100): | |
full_timestamp = (full_capacity[0] - intercept_pbs - yshift) / slope_pbs_per_sec | |
fill_date_str = "%s will fill to %4.1f PB (%3d%%) on %s based on rate of %.1f TB/day between %s and %s (R=%.4f)" % ( | |
args.fs, | |
full_capacity[0], | |
full_capacity[1], | |
datetime.datetime.fromtimestamp(full_timestamp).date(), | |
fit.slope * 1000 * 86400, | |
start.strftime(DATE_FMT), | |
end.strftime(DATE_FMT), | |
fit.rvalue) | |
print(fill_date_str) | |
if ax: | |
ax = cfsalib.fullness.plot_fit( | |
dataframe, | |
fit, | |
ax, | |
label="%.1f TB/day (R=%.4f)" % (fit.slope * 1000 * 86400, fit.rvalue)) | |
ax.legend() | |
if ax: | |
ax.get_figure().savefig(args.plot_output, bbox_inches='tight', transparent=True) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment