Skip to content

Instantly share code, notes, and snippets.

@maxfischer2781
Last active April 29, 2017 08:28
Show Gist options
  • Save maxfischer2781/507389c43cc66116e6eae68413c440b6 to your computer and use it in GitHub Desktop.
Save maxfischer2781/507389c43cc66116e6eae68413c440b6 to your computer and use it in GitHub Desktop.
crawler for ALICE xrootd namespaces
from __future__ import print_function
import subprocess
import os
import argparse
import ast
import multiprocessing
import sys
import threading
import errno
FIND_KEYS = {
'inode': '%i',
'name': '"%p"',
'size_B': '%s',
'disk_kB': '%k',
'a_time': '%A@',
'c_time': '%C@',
'm_time': '%T@',
'type': '"%Y"',
'uid': '%U',
'gid': '%G',
'perm': '%m',
}
DEFAULT_KEYS = ('name', 'inode', 'size_B', 'm_time')
class SearchDir(object):
"""
A directory to search for file metadata
"""
def __init__(self, path, recurse=True):
self.path = path
self.recurse = recurse
def search(self, find_keys=None):
find_keys = find_keys if find_keys is not None else DEFAULT_KEYS
cmd = [
'find' if sys.platform != 'darwin' else 'gfind',
'-L', self.path,
]
if not self.recurse:
cmd += ['-maxdepth', '0']
cmd += [
'-type', 'f',
'-printf', '\t'.join(FIND_KEYS[key] for key in find_keys) + '\n'
]
finder = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
)
# print('"' + '" "'.join(cmd) + '"')
try:
for line in finder.stdout:
line = line.decode()
values = line.split('\t')
yield [ast.literal_eval(val.strip()) for val in values]
finally:
finder.terminate()
def __repr__(self):
return '%s(%r, recurse=%s)' % (self.__class__.__name__, self.path, self.recurse)
def find_searchdirs(basepath, max_recurse=1):
"""Find all sub-directories to search inside a basepath"""
basepath = basepath.rstrip(os.sep)
dirs = []
_path_stack = [basepath]
while _path_stack:
current_path = _path_stack.pop()
dirpath = os.path.abspath(current_path)
relpath = os.path.relpath(dirpath, basepath)
if relpath == '.':
depth = 0
else:
depth = relpath.count(os.sep) + 1
# once we have recursed deeply enough, have the entire directory searched as a task
if depth == max_recurse:
if os.path.isfile(current_path):
dirs.append(SearchDir(dirpath, recurse=False))
else:
print('adding path %r, depth %d for inspection' % (dirpath, depth), file=sys.stderr)
dirs.append(SearchDir(dirpath))
else:
try:
for subpath in os.listdir(dirpath):
_path_stack.append(os.path.join(current_path, subpath))
except OSError as err:
# if there are files at intermediate levels, reap them without recursion
if err.errno == errno.ENOTDIR:
dirs.append(SearchDir(dirpath, recurse=False))
else:
raise
return dirs
def search_dir(target, push_every=100, find_keys=None):
"""Search all paths inside target"""
if find_keys is not None:
dir_iter = target.search(find_keys=find_keys)
else:
dir_iter = target.search()
file_buffer = []
for file_data in dir_iter:
file_buffer.append(file_data)
if len(file_buffer) >= push_every:
search_dir.queue.put(file_buffer)
file_buffer = []
if file_buffer:
search_dir.queue.put(file_buffer)
def pmap(args):
return search_dir(*args)
def pinit(queue):
search_dir.queue = queue
def preduce(queue, find_keys):
find_keys = find_keys if find_keys is not None else DEFAULT_KEYS
print(','.join(find_keys))
while True:
file_buffer = queue.get()
if file_buffer is None:
return
for file_data in file_buffer:
line = ','.join(str(item) for item in file_data)
print(line)
def reap_from_path(path, max_recurse=1, nprocesses=None, push_every=100, find_keys=None):
dirs = find_searchdirs(path, max_recurse=max_recurse)
print('Searching %d directories...' % len(dirs), file=sys.stderr)
queue = multiprocessing.Queue()
pool = multiprocessing.Pool(processes=nprocesses, initializer=pinit, initargs=(queue,))
procs = pool.imap_unordered(
pmap, [
(target, push_every, find_keys) for target in dirs
],
chunksize=1,
)
reaper_thread = threading.Thread(target=preduce, args=(queue, find_keys))
reaper_thread.daemon = True
reaper_thread.start()
for count, _ in enumerate(procs):
print('%d/%d' % (count + 1, len(dirs)), file=sys.stderr)
queue.put(None)
try:
reaper_thread.join()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
CLI = argparse.ArgumentParser()
CLI.add_argument(
'--path',
default='.',
help='Path from which to start searching files'
)
CLI.add_argument(
'-r',
'--max-recurse',
default=2,
type=int,
help='Maximum recursion depth for parallelization'
)
CLI.add_argument(
'--max-output',
default=float('inf'),
type=int,
help='Maximum output to produce, in bytes'
)
options = CLI.parse_args()
reap_from_path(path=options.path, max_recurse=options.max_recurse)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment