Skip to content

Instantly share code, notes, and snippets.

@glennklockwood
Last active December 23, 2020 02:01
Show Gist options
  • Save glennklockwood/755131fd6939f399232959005d94a98d to your computer and use it in GitHub Desktop.
Save glennklockwood/755131fd6939f399232959005d94a98d to your computer and use it in GitHub Desktop.
File system fullness watcher
#!/usr/bin/env python3
"""Creates JSON representation of file system fullness
"""
import re
import time
import json
import subprocess
WATCHED_FS = re.compile(r'(^(dev)?tmpfs|:/cvmfs(/|\s)|/dev/|overlay)')
INTEGER_COLS = {'1K-blocks', 'Used', 'Available', 'Inodes', 'IUsed', 'IFree'}
DELETE_COLS = {'Use%', "IUse%"}
def parse_df(output_str):
result = {}
columns = None
for line in [x.decode() for x in output_str.splitlines()]:
# First line always defines column names
if columns is None:
columns = line.split()
continue
# Filter out file systems uninteresting file systems
match = WATCHED_FS.search(line)
if match:
continue
# create list of [(colname0, colval0), (colname1, colval1), ...]
keys_vals = list(zip(columns, line.split()))
# first column is always the file system name - pop this off so we can
# key df values by fsname
_, fsname = keys_vals.pop(0)
# transform list of tuples into a dictionary
result_dict = dict(keys_vals)
# convert certain columns into integers
for integer_col in INTEGER_COLS:
if integer_col in result_dict:
result_dict[integer_col] = int(result_dict[integer_col])
for delete_col in DELETE_COLS:
if delete_col in result_dict:
del result_dict[delete_col]
# build a dict, keyed by fsname, containing fs fullness
result[fsname] = result_dict
return result
def main(argv=None):
result = {
'_timestamp': int(time.time()) # high precision timestamp is unnecessary and misleading
}
for df_opt in ['-k', '-i']:
for key, value in parse_df(subprocess.check_output(['df', df_opt])).items():
if key not in result:
result[key] = {}
result[key].update(value)
return result
if __name__ == "__main__":
print(json.dumps(main()))
"""Estimates file system fill rate and date of fullness
"""
import json
import datetime
import dask
import dask.bag
import scipy.stats
import tokio
try:
import abcutils
import matplotlib
import matplotlib.pyplot
except ImportError:
pass
def parse_line(line, file_system):
"""Function to parse each line of the df output file
Args:
line (str): A single json-encoded line containing df data. This must
match the format created by the df.py script running in crontab.
file_system (str): Name of file system to extract from df file
Returns:
dict: Contains three keys:
- timestamp (int): timestamp of measurement
- used (int): capacity used in kibibytes
- available (int): capacity remaining in kibibytes
"""
record = json.loads(line)
if file_system not in record:
return {}
return {
'timestamp': record['_timestamp'],
'used': record[file_system]['Used'],
'available': record[file_system]['Available'],
}
def plot_capacity(plot_df, ax=None):
"""Plots capacity usage over time for a DataFrame
Args:
plot_df (pandas.DataFrame): DataFrame of format returned by
load_df_files()
ax (matplotlib.pyplot.axes.Axes or None): Axes on which plot should be
drawn. If none, create a new axis.
Returns:
matplotlib.pyplot.axes.Axes: Axes on which plot was drawn
"""
if ax is None:
_, ax = matplotlib.pyplot.subplots()
xval = plot_df['timestamp'].values
yval = plot_df['used'].values * 1024 / 10**15
fs_size_pb = (plot_df['used'] + plot_df['available']).values[-1] * 1024 / 10**15
ax.plot(xval, yval, linewidth=2)
ax.set_ylabel("Used Capacity (PB)")
ax.set_xlabel("")
ax.grid()
ax.set_axisbelow(True)
delta_days = (xval[-1] - xval[0]) / 86400
if delta_days <= 7:
criteria = lambda x: True
elif delta_days <= 28:
criteria = lambda x: (x.toordinal() % 2) == 0
else:
criteria = lambda x: x.weekday() == 0
abcutils.plot.fix_xticks_timeseries(ax=ax, format="%a %b %-d", criteria=criteria)
def abs2rel(cap):
return cap / fs_size_pb * 100.0
def rel2abs(cap):
return cap * fs_size_pb* 100.0
secax = ax.secondary_yaxis('right', functions=(abs2rel, rel2abs))
secax.set_ylabel("Used Capacity (%)")
return ax
def plot_fit(plot_df, fit, ax, **kwargs):
"""Plots a line of best fit over a dataframe
"""
xval = plot_df['timestamp'].values
ax.plot(
xval,
(fit.slope * xval + fit.intercept),
**kwargs)
return ax
def load_df_files(start, end, template, file_system):
"""Loads df data into a dataframe using Dask
Args:
start (datetime.datetime): Load data starting on or after this date
end (datetime.datetime): Do not load data after this date
template (str): Path to pass to strftime that resolves to one or more
input files corresponding to the start and end dates give.
file_system (str): File system to load from the df files
Returns:
pandas.DataFrame: Contains the following columns:
- timestamp (int): Seconds since epoch corresponding to a single
fullness measurement
- used (int): Capacity used expressed in units of kibibytes
- available (int): Remaining capacity available, expressed as kibibytes
Its index is named ``datetime`` and is a timestamp index.
"""
df_files = tokio.tools.common.enumerate_dated_files(
start=start,
end=end,
template=template,
use_glob=True)
bag = dask.bag.read_text(df_files).map(parse_line, file_system)
dataframe = bag.to_dataframe(meta={'timestamp': int, 'used': int, 'available': int})
dataframe['datetime'] = dataframe['timestamp'].map(datetime.datetime.fromtimestamp)
return dataframe.set_index('datetime').compute()
def fit_interval(dataframe, start=None, end=None):
"""Calculate linear fit from a subset of a DataFrame generated by load_df_files
Args:
dataframe (pandas.DataFrame): DataFrame of format returned by load_df_files()
start (datetime.datetime or None): Fit to data starting on or after this
date. If None, do not apply this filter.
end (datetime.datetime or None): Do not fit to data data after this
date. If None, do not apply this filter.
Returns:
Same as scipy.stats.linregress() which includes the following attributes:
- slope (float): expressed in units of base-10 petabytes per second
- intercept (float): expressed as base-10 petabytes
- rvalue (float): correlation coefficient
"""
xval_fit = dataframe['timestamp'].values
yval_fit = dataframe['used'] * 1024 / 10**15
filt = [True] * dataframe.index.shape[0]
if start:
filt &= dataframe.index >= start
if end:
filt &= dataframe.index < end
xval_fit = dataframe.loc[filt]['timestamp'].values
yval_fit = dataframe.loc[filt]['used'] * 1024 / 10**15
return scipy.stats.linregress(xval_fit, yval_fit)
#!/usr/bin/env python3
import os
import sys
import argparse
import datetime
import matplotlib
matplotlib.use("Agg")
from . import fullness
DATE_FMT_PRINT = "YY-MM-DD"
DATE_FMT = "%Y-%m-%d"
KIB_TO_PB = 1024.0 / 10**15
FIT_INTERVALS = [
(None, None),
(datetime.datetime(2020, 3, 1), datetime.datetime(2020, 3, 27)),
(datetime.datetime(2020, 4, 7), None),
(datetime.datetime.today() - datetime.timedelta(days=7), None),
(datetime.datetime.today() - datetime.timedelta(days=14), None),
]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--start', type=str, default=None,
help='first timestamp (inclusive) when creating new output file,' +
' in %s format (default: 7 days ago)' % DATE_FMT_PRINT)
parser.add_argument('--end', type=str, default=None,
help='final timestamp (exclusive) when creating new output file,' +
' in %s format (default: today)' % DATE_FMT_PRINT)
parser.add_argument("--fill-pct", type=float, default=80,
help="Estimate date to reach this percent fullness (default: 80)")
parser.add_argument("-", "--plot-output", type=str, default=None,
help="Location to produce graphical plot (default None)")
parser.add_argument("fs", type=str, default=None,
help="Name of file system to load from input file(s)")
parser.add_argument(
"--template",
type=str,
default=os.path.join(os.environ.get("PYTOKIO_DAILY_DATA_DIR", os.getcwd()), DATE_FMT, "df_*.txt"),
help="Template to pass to strftime to identify input files (default: $PYTOKIO_DAILY_DATA_DIR/%s/df_*.txt" % DATE_FMT.replace("%", "%%"))
args = parser.parse_args(argv)
try:
if args.end:
end = datetime.datetime.strptime(args.end, DATE_FMT)
else:
end = datetime.datetime.today()
if args.start:
start = datetime.datetime.strptime(args.start, DATE_FMT)
else:
start = end - datetime.timedelta(days=7)
except ValueError:
sys.stderr.write("Start and end times must be in format %s\n" % DATE_FMT_PRINT)
raise
# print("Loading from %s" % args.template)
dataframe = cfsalib.fullness.load_df_files(
start=start,
end=end,
template=args.template,
file_system=args.fs)
fit = cfsalib.fullness.fit_interval(dataframe, start, end)
# print("%s to %s: %.2f TB/day (R=%.4f)" % (
# start.strftime("%Y-%m-%d") if start else datetime.datetime.fromtimestamp(dataframe.iloc[0]['timestamp']).strftime("%Y-%m-%d"),
# end.strftime("%Y-%m-%d") if end else "today",
# fit.slope * 1000 * 86400,
# fit.rvalue))
slope_pbs_per_sec = fit.slope
intercept_pbs = fit.intercept
# yshift = difference between measured and predicted at end of measured timeseries
yshift = dataframe['used'].iloc[-1] * KIB_TO_PB - (dataframe['timestamp'].iloc[-1] * slope_pbs_per_sec + intercept_pbs)
max_pb = (dataframe['used'] + dataframe['available']).values[0] * KIB_TO_PB
most_pb = max_pb / 100.0 * args.fill_pct
ax = None
if args.plot_output:
ax = cfsalib.fullness.plot_capacity(dataframe)
for full_capacity in (most_pb, args.fill_pct),:# (max_pb, 100):
full_timestamp = (full_capacity[0] - intercept_pbs - yshift) / slope_pbs_per_sec
fill_date_str = "%s will fill to %4.1f PB (%3d%%) on %s based on rate of %.1f TB/day between %s and %s (R=%.4f)" % (
args.fs,
full_capacity[0],
full_capacity[1],
datetime.datetime.fromtimestamp(full_timestamp).date(),
fit.slope * 1000 * 86400,
start.strftime(DATE_FMT),
end.strftime(DATE_FMT),
fit.rvalue)
print(fill_date_str)
if ax:
ax = cfsalib.fullness.plot_fit(
dataframe,
fit,
ax,
label="%.1f TB/day (R=%.4f)" % (fit.slope * 1000 * 86400, fit.rvalue))
ax.legend()
if ax:
ax.get_figure().savefig(args.plot_output, bbox_inches='tight', transparent=True)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment