|
""" Script for plotting a pie chart of file and directory sizes in a given |
|
target directory. The target directory (whose file sizes should be calculated), |
|
output image path (in which the pie chart should be saved), and cache file (in |
|
which previously calculated file and directory sizes are saved for later), can |
|
all be configured using command-line arguments. |
|
|
|
Usage examples: |
|
|
|
python ./plot_dir_contents.py |
|
|
|
python ./plot_dir_contents.py --target_dir "C:/Program Files/MATLAB" |
|
|
|
python ./plot_dir_contents.py --target_dir "C:/" |
|
|
|
For more information about available command-line arguments, use the following |
|
command: |
|
|
|
python ./plot_dir_contents.py -h |
|
|
|
""" |
|
|
|
import os |
|
from argparse import ArgumentParser |
|
from time import perf_counter |
|
from math import ceil |
|
from multiprocessing import Process, Queue |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
from matplotlib.patches import Patch |
|
|
|
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
DEFAULT_CACHE_FILENAME = "file_and_dir_sizes.txt" |
|
LEGEND_STR_MAX_LEN = 30 |
|
LEGEND_STR_TRIMMED_LEN = 25 |
|
LEGEND_MAX_FRACTION_DISPLAY_LABEL = 15 |
|
LEGEND_NUM_LABELS_PER_COL = 20 |
|
LEGEND_MAX_NUM_COLS = 5 |
|
|
|
def dict_to_str(d): |
|
""" Convert a dictionary to a human-readable string which can be saved (and |
|
later parsed) in a text file """ |
|
s = "\n".join("%s::%s" % (key, val) for key, val in d.items()) |
|
return s |
|
|
|
def str_to_dict(s): |
|
""" Perform the inverse of the dict_to_str, convert a string into a |
|
dictionary in the correct format """ |
|
key_value_fmt = lambda key, value: (key, int(value)) |
|
d = dict(key_value_fmt(*line.split("::")) for line in s.split("\n")) |
|
return d |
|
|
|
def read_cache_from_file(cache_file_path): |
|
""" Read dictionary mapping files and directory paths to their sizes in |
|
bytes from a formatted cache file """ |
|
cached_sizes_dict = dict() |
|
if os.path.isfile(cache_file_path): |
|
with open(cache_file_path, "r") as f: |
|
try: |
|
cached_sizes_str = f.read() |
|
if len(cached_sizes_str.rstrip()) > 0: |
|
cached_sizes_dict = str_to_dict(cached_sizes_str) |
|
except ValueError: |
|
print( |
|
"Cache file \"%s\" is invalid; using a blank dictionary" |
|
% cache_file_path |
|
) |
|
|
|
return cached_sizes_dict |
|
|
|
def write_cache_to_file(cache_file_path, cached_sizes_dict): |
|
""" Write dictionary mapping files and directory paths to their sizes in |
|
bytes to a formatted cache file """ |
|
with open(cache_file_path, "w") as f: |
|
f.write(dict_to_str(cached_sizes_dict)) |
|
|
|
def size_fmt_str(size): |
|
""" Format an integer which represents a file size as a string which |
|
specifies bytes, KB, MB, GB, or TV """ |
|
if size < 1024: |
|
return "%i bytes" % size |
|
size /= 1024 |
|
for unit in ["KB", "MB", "GB"]: |
|
if size < 1024: |
|
if size < 1000: |
|
return "%.1f %s" % (size, unit) |
|
else: |
|
return "%i %s" % (size, unit) |
|
size /= 1024 |
|
return "%.1f TB" % size |
|
|
|
def trim_legend_str(s): |
|
""" Trim strings used in the legend of the pie chart """ |
|
if len(s) < LEGEND_STR_MAX_LEN: |
|
return s |
|
else: |
|
return s[:LEGEND_STR_TRIMMED_LEN] + "..." |
|
|
|
def pie_chart_label_fmt(rel_path, size): |
|
""" Make a formatted label for the wedges and legend in a pie chart, given |
|
the relative path to a file or folder, and it's size on disk (as an integer) |
|
""" |
|
label_str = "(%s) %s" % (size_fmt_str(size), trim_legend_str(rel_path)) |
|
return label_str |
|
|
|
def get_default_output_image_path(target_dir): |
|
""" Given the path to the target directory whose file and directory sizes |
|
will be calculated, create a unique, valid, default path which will be used |
|
by make_plot as the path of the saved image containing the pie chart of the |
|
directory content sizes """ |
|
output_image_filename = "Directory content sizes in '%s'.png" % target_dir |
|
output_image_filename = output_image_filename.replace(":", "-") |
|
output_image_filename = output_image_filename.replace("/", "-") |
|
output_image_filename = output_image_filename.replace("\\", "-") |
|
output_image_path = os.path.join(CURRENT_DIR, output_image_filename) |
|
return output_image_path |
|
|
|
def get_dir_size(full_path): |
|
""" Find the size in bytes of the directory with the given path by |
|
recursively finding the sizes of all files and directories within the target |
|
directory. The os.stat function is used instead of os.path.getsize to avoid |
|
errors with symlink files """ |
|
dir_size = sum( |
|
os.stat( |
|
os.path.join(dir_name, filename), |
|
follow_symlinks=False |
|
).st_size |
|
for (dir_name, _, f_list) in os.walk(full_path) |
|
for filename in f_list |
|
) |
|
return dir_size |
|
|
|
def get_dir_size_wrapper(full_path, result_queue): |
|
""" Wrapper for the get_dir_size function, which calls that function and |
|
places the result in a Queue object, which can be retrieved by the calling |
|
process """ |
|
dir_size = get_dir_size(full_path) |
|
result_queue.put(dir_size) |
|
|
|
def get_dir_size_timeout(full_path, timeout, *args): |
|
""" Call the get_dir_size function in a subprocess to calculate the size of |
|
the directory with path full_path. If after timeout seconds get_dir_size |
|
hasn't returned, then terminate the subprocess and call get_file_dir_sizes |
|
to calculate and cache all the files and directories within the target |
|
directory, and then call make_plot to make a pie chart of the results. |
|
Finally, return the size in bytes of the target directory. |
|
|
|
TODO: accept dir_contents_size_dict in this function and pass it to |
|
get_file_dir_sizes, instead of writing it to file before calling this |
|
function and reading it again afterwards """ |
|
# Initialise the Queue and Process objects |
|
result_queue = Queue() |
|
p = Process(target=get_dir_size_wrapper, args=[full_path, result_queue]) |
|
# Start the subprocess and wait for timeout seconds |
|
p.start() |
|
p.join(timeout) |
|
if p.is_alive(): |
|
# The function hasn't returned, so terminate the subprocess |
|
p.terminate() |
|
p.join() |
|
p.close() |
|
# Recurse by calling get_file_dir_sizes for the target directory |
|
print("\n*** Timeout; recursing... ***\n") |
|
dir_contents_size_dict = get_file_dir_sizes(full_path, timeout, *args) |
|
# Calculate the size of the directory and plot the results |
|
dir_size = sum(dir_contents_size_dict.values()) |
|
output_image_path = get_default_output_image_path(full_path) |
|
make_plot(dir_contents_size_dict, output_image_path, full_path) |
|
else: |
|
# The process finished, so retrieve the results and close the process |
|
dir_size = result_queue.get() |
|
p.close() |
|
|
|
# Return the size of the directory |
|
return dir_size |
|
|
|
def get_file_dir_sizes( |
|
target_dir, |
|
timeout, |
|
cache_file_path, |
|
use_cache, |
|
max_num_items_no_timeout, |
|
): |
|
""" Find a dictionary mapping relative paths in the target directory to |
|
their integer size in bytes. |
|
|
|
Inputs: |
|
- target_dir: string containing the absolute path to the target directory |
|
- timeout: if timeout is not None and it takes longer than timeout seconds |
|
to find the size of a subdirectory of the target directory, then this |
|
function is called recursively with this subdirectory as the target |
|
directory, and a pie chart is made of the contents of this subdirectory |
|
as well |
|
- cache_file_path: either be a string containing the path to the cache |
|
file of previously calculated file sizes, or None, in which case a cache |
|
file in the same directory as this script called DEFAULT_CACHE_FILENAME |
|
is used |
|
- use_cache: boolean, if true, then use the cache to look up previously |
|
calculated sizes of files and directories; otherwise everything all file |
|
and directory sizes will be calculated and written to the cache |
|
- max_num_items_no_timeout: If there are more than |
|
max_num_items_no_timeout items (files or directories) in the target |
|
directory, then a timeout is not enabled for calculating the size of |
|
subdirectories of the target directory. This can make the calculation |
|
significantly faster if there are a large number of subdirectories, |
|
because there is a ~0.5 second overhead for launching the subprocess |
|
when calculating the size of each subdirectory (and using a subprocess |
|
is necessary if using a timeout) |
|
|
|
Raises: |
|
- ValueError: if cache_file_path is not None and the corresponding path |
|
doesn't exist |
|
""" |
|
# Get start time |
|
t_start = perf_counter() |
|
|
|
# Read the cache dictionary from disk |
|
cached_sizes_dict = read_cache_from_file(cache_file_path) |
|
|
|
# Get relative paths to directories and files in the target directory |
|
dir_contents_rel_paths = os.listdir(target_dir) |
|
|
|
# Decide whether or not to use a timeout |
|
num_items = len(dir_contents_rel_paths) |
|
no_timeout = (timeout is None) or (num_items > max_num_items_no_timeout) |
|
|
|
# Initialise results dictionary of file/directory sizes |
|
dir_contents_size_dict = dict() |
|
|
|
# Iterate through each file or directory in the target directory |
|
for i, rel_path in enumerate(dir_contents_rel_paths): |
|
t_start_f = perf_counter() |
|
full_path = os.path.abspath(os.path.join(target_dir, rel_path)) |
|
rel_path_fmt = rel_path + "/" if os.path.isdir(full_path) else rel_path |
|
print("[%i/%i] Finding size of %r..." % ( |
|
i + 1, |
|
len(dir_contents_rel_paths), |
|
full_path |
|
)) |
|
|
|
# If we have a cached result, use it |
|
if use_cache and (full_path in cached_sizes_dict): |
|
print("\t**Found cached result**") |
|
dir_contents_size_dict[rel_path_fmt] = cached_sizes_dict[full_path] |
|
|
|
# If it is a file, find its size |
|
elif os.path.isfile(full_path): |
|
dir_contents_size_dict[rel_path_fmt] = os.path.getsize(full_path) |
|
|
|
# If it is a directory, recursively find the size of all files within |
|
elif os.path.isdir(full_path): |
|
if no_timeout: |
|
# Find the size of the directory without a timeout |
|
dir_size = get_dir_size(full_path) |
|
else: |
|
# Write the cache to file |
|
write_cache_to_file(cache_file_path, cached_sizes_dict) |
|
# Find the size of the directory using a timeout |
|
dir_size = get_dir_size_timeout( |
|
full_path, |
|
timeout, |
|
cache_file_path, |
|
use_cache, |
|
max_num_items_no_timeout, |
|
) |
|
# Read the cache from file |
|
cached_sizes_dict = read_cache_from_file(cache_file_path) |
|
|
|
# Store the size of the subdirectory in the results dictionary |
|
dir_contents_size_dict[rel_path_fmt] = dir_size |
|
|
|
# Unsupported type: symbolic link, mount, etc |
|
else: |
|
try: |
|
dir_contents_size_dict[rel_path_fmt] = os.stat( |
|
full_path, |
|
follow_symlinks=False, |
|
).st_size |
|
except: |
|
raise NotImplementedError( |
|
"File \"%s\" has an unsupported file type" % full_path |
|
) |
|
|
|
# Store the size in the cache dictionary, and print the result |
|
cached_sizes_dict[full_path] = dir_contents_size_dict[rel_path_fmt] |
|
print("\tSize = %i bytes, calculated in %.3f s\n" % ( |
|
dir_contents_size_dict[rel_path_fmt], |
|
perf_counter() - t_start_f |
|
)) |
|
|
|
# Add total target directory size to cache, and save cache to disk |
|
cached_sizes_dict[target_dir] = sum(dir_contents_size_dict.values()) |
|
write_cache_to_file(cache_file_path, cached_sizes_dict) |
|
|
|
# Print time taken, and return list of directory content sizes |
|
time_taken = perf_counter() - t_start |
|
print( |
|
"Finished finding file sizes in \"%s\" in %.3f s" |
|
% (target_dir, time_taken) |
|
) |
|
return dir_contents_size_dict |
|
|
|
def make_plot(dir_contents_size_dict, output_image_path, target_dir): |
|
""" Given a dictionary mapping relative paths in the target dictionary to |
|
their integer size in bytes, make a pie chart of these file sizes, and save |
|
it to disk, under the filename output_image_path. The name of the target |
|
directory target_dir is used in the plot title """ |
|
# Calculate the total size of the target |
|
total_dir_size = sum(dir_contents_size_dict.values()) |
|
|
|
# If there are too many values, then only use the biggest ones |
|
trimmed_labels = False |
|
max_num_labels = LEGEND_NUM_LABELS_PER_COL * LEGEND_MAX_NUM_COLS |
|
total_num_labels = len(dir_contents_size_dict) |
|
if total_num_labels > max_num_labels: |
|
labels_sorted_by_size = sorted( |
|
dir_contents_size_dict.keys(), |
|
key=lambda k: dir_contents_size_dict[k], |
|
reverse=True, |
|
) |
|
top_n_labels = set(labels_sorted_by_size[:max_num_labels]) |
|
dir_contents_size_dict = dict( |
|
(k, v) |
|
for k, v in dir_contents_size_dict.items() |
|
if k in top_n_labels |
|
) |
|
trimmed_labels = True |
|
|
|
# Get sorted list of files/directories in the target directory |
|
rel_path_list = sorted( |
|
dir_contents_size_dict.keys(), |
|
key=lambda k: dir_contents_size_dict[k], |
|
reverse=True, |
|
) |
|
|
|
# Make formatted labels for wedges |
|
min_wedge_label_size = total_dir_size / LEGEND_MAX_FRACTION_DISPLAY_LABEL |
|
wedge_label_list = [ |
|
pie_chart_label_fmt(rel_path, dir_contents_size_dict[rel_path]) |
|
if (dir_contents_size_dict[rel_path] > min_wedge_label_size) |
|
else None |
|
for rel_path in rel_path_list |
|
] |
|
|
|
# Create figure, axes, and colour list |
|
legend_ncol = ceil(len(dir_contents_size_dict) / LEGEND_NUM_LABELS_PER_COL) |
|
fig_width = 4 * (legend_ncol + 2) |
|
fig, axes = plt.subplots( |
|
1, |
|
2, |
|
sharex=True, |
|
figsize=[fig_width, 6], |
|
gridspec_kw={ |
|
"width_ratios": [2, legend_ncol], |
|
"wspace": 0, |
|
"left": 0, |
|
"right": 1, |
|
}, |
|
) |
|
colours = plt.get_cmap("hsv")( |
|
np.linspace(0, 1, len(dir_contents_size_dict), endpoint=False) |
|
) |
|
# Plot pie chart |
|
axes[0].pie( |
|
[dir_contents_size_dict[rel_path] for rel_path in rel_path_list], |
|
labels=wedge_label_list, |
|
colors=colours, |
|
wedgeprops={"width": 1, "edgecolor": "k"}, |
|
startangle=90, |
|
counterclock=False |
|
) |
|
# Format, save and close |
|
legend_label_list = [ |
|
pie_chart_label_fmt(rel_path, dir_contents_size_dict[rel_path]) |
|
for rel_path in rel_path_list |
|
] |
|
axes[1].legend(loc="center", ncol=legend_ncol, handles=[ |
|
Patch(facecolor=c, edgecolor="k", label=label) |
|
for c, label in zip(colours, legend_label_list) |
|
]) |
|
axes[1].axis("off") |
|
title = "Directory content sizes in \"%s\"\nTotal size = %s" % ( |
|
target_dir, |
|
size_fmt_str(total_dir_size), |
|
) |
|
if trimmed_labels: |
|
title += " (only showing %i biggest items out of %i)" % ( |
|
max_num_labels, |
|
total_num_labels, |
|
) |
|
fig.suptitle(title) |
|
fig.savefig(output_image_path) |
|
plt.close(fig) |
|
print("Output image saved in \"%s\"" % output_image_path) |
|
|
|
|
|
if __name__ == "__main__": |
|
# Define CLI using argparse |
|
parser = ArgumentParser( |
|
description="Script for plotting a pie chart of file and directory " |
|
"sizes in a given directory" |
|
) |
|
|
|
parser.add_argument( |
|
"--target_dir", |
|
help="Name of the target directory whose file and directory sizes " |
|
"should be calculated and plotted in the pie chart. If not included, " |
|
"then plot the file sizes of the current directory", |
|
default=CURRENT_DIR, |
|
type=str, |
|
) |
|
parser.add_argument( |
|
"--cache_file", |
|
help="Path of the file in which to store previously calculated file " |
|
"and directory sizes. If left blank, a cache file is created in the " |
|
"current directory called %r" % DEFAULT_CACHE_FILENAME, |
|
default=None, |
|
type=str, |
|
) |
|
parser.add_argument( |
|
"--output_image_path", |
|
help="Path to the output file to save the pie-chart image. If left " |
|
"blank, a custom filename is created in the current directory, " |
|
"specific to the name of the target directory", |
|
default=None, |
|
type=str, |
|
) |
|
parser.add_argument( |
|
"--no_cache", |
|
help="If this argument is included, then the cache will not be used " |
|
"to look up previously calculated sizes of files and directories (but " |
|
"newly calculated sizes will be added to the cache)", |
|
action="store_false", |
|
dest="use_cache", |
|
) |
|
parser.add_argument( |
|
"--timeout", |
|
help="Maximum number of seconds to spend recursively finding the size " |
|
"of a subdirectory within the target directory, before recursively " |
|
"using the subdirectory as a new target directory until its size has " |
|
"been calculated, and plotting a pie chart of the file and directory " |
|
"sizes of that subdirectory. This argument is overridden if the " |
|
"no_timeout argument is used", |
|
type=float, |
|
default=5, |
|
) |
|
parser.add_argument( |
|
"--no_timeout", |
|
help="If this argument is present, then do not recursively use a " |
|
"subdirectory as a new target directory if it is taking too long to " |
|
"find the file and directory sizes within that subdirectory. This " |
|
"also avoids finding the size of each subdirectory of the target " |
|
"directory in a subprocess, and can be significantly faster if there " |
|
"are a large number of subdirectories in the main target directory", |
|
action="store_true", |
|
) |
|
parser.add_argument( |
|
"--max_num_items_no_timeout", |
|
help="If there are more than max_num_items_no_timeout items (files or " |
|
"directories) in the target directory, then a timeout is not enabled " |
|
"for calculating the size of subdirectories of the target directory. " |
|
"This can make the calculation significantly faster if there are a " |
|
"large number of subdirectories, because there is a ~0.5 second " |
|
"overhead for launching the subprocess when calculating the size of " |
|
"each subdirectory (and using a subprocess is necessary if using a " |
|
"timeout)", |
|
type=int, |
|
default=500, |
|
) |
|
|
|
# Parse arguments |
|
args = parser.parse_args() |
|
args.target_dir = os.path.abspath(args.target_dir) |
|
if args.no_timeout: |
|
args.timeout = None |
|
if args.output_image_path is None: |
|
args.output_image_path = get_default_output_image_path(args.target_dir) |
|
if args.cache_file is None: |
|
args.cache_file = os.path.join(CURRENT_DIR, DEFAULT_CACHE_FILENAME) |
|
elif not os.path.isfile(args.cache_file): |
|
raise ValueError("Could not find cache file \"%s\"" % args.cache_file) |
|
|
|
# Get file sizes |
|
dir_contents_size_dict = get_file_dir_sizes( |
|
args.target_dir, |
|
args.timeout, |
|
args.cache_file, |
|
args.use_cache, |
|
args.max_num_items_no_timeout, |
|
) |
|
# Make output plot |
|
make_plot( |
|
dir_contents_size_dict, |
|
args.output_image_path, |
|
args.target_dir, |
|
) |