Last active
April 29, 2017 08:28
-
-
Save maxfischer2781/507389c43cc66116e6eae68413c440b6 to your computer and use it in GitHub Desktop.
crawler for ALICE xrootd namespaces
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import subprocess | |
import os | |
import argparse | |
import ast | |
import multiprocessing | |
import sys | |
import threading | |
import errno | |
FIND_KEYS = { | |
'inode': '%i', | |
'name': '"%p"', | |
'size_B': '%s', | |
'disk_kB': '%k', | |
'a_time': '%A@', | |
'c_time': '%C@', | |
'm_time': '%T@', | |
'type': '"%Y"', | |
'uid': '%U', | |
'gid': '%G', | |
'perm': '%m', | |
} | |
DEFAULT_KEYS = ('name', 'inode', 'size_B', 'm_time') | |
class SearchDir(object): | |
""" | |
A directory to search for file metadata | |
""" | |
def __init__(self, path, recurse=True): | |
self.path = path | |
self.recurse = recurse | |
def search(self, find_keys=None): | |
find_keys = find_keys if find_keys is not None else DEFAULT_KEYS | |
cmd = [ | |
'find' if sys.platform != 'darwin' else 'gfind', | |
'-L', self.path, | |
] | |
if not self.recurse: | |
cmd += ['-maxdepth', '0'] | |
cmd += [ | |
'-type', 'f', | |
'-printf', '\t'.join(FIND_KEYS[key] for key in find_keys) + '\n' | |
] | |
finder = subprocess.Popen( | |
cmd, | |
stdout=subprocess.PIPE, | |
) | |
# print('"' + '" "'.join(cmd) + '"') | |
try: | |
for line in finder.stdout: | |
line = line.decode() | |
values = line.split('\t') | |
yield [ast.literal_eval(val.strip()) for val in values] | |
finally: | |
finder.terminate() | |
def __repr__(self): | |
return '%s(%r, recurse=%s)' % (self.__class__.__name__, self.path, self.recurse) | |
def find_searchdirs(basepath, max_recurse=1): | |
"""Find all sub-directories to search inside a basepath""" | |
basepath = basepath.rstrip(os.sep) | |
dirs = [] | |
_path_stack = [basepath] | |
while _path_stack: | |
current_path = _path_stack.pop() | |
dirpath = os.path.abspath(current_path) | |
relpath = os.path.relpath(dirpath, basepath) | |
if relpath == '.': | |
depth = 0 | |
else: | |
depth = relpath.count(os.sep) + 1 | |
# once we have recursed deeply enough, have the entire directory searched as a task | |
if depth == max_recurse: | |
if os.path.isfile(current_path): | |
dirs.append(SearchDir(dirpath, recurse=False)) | |
else: | |
print('adding path %r, depth %d for inspection' % (dirpath, depth), file=sys.stderr) | |
dirs.append(SearchDir(dirpath)) | |
else: | |
try: | |
for subpath in os.listdir(dirpath): | |
_path_stack.append(os.path.join(current_path, subpath)) | |
except OSError as err: | |
# if there are files at intermediate levels, reap them without recursion | |
if err.errno == errno.ENOTDIR: | |
dirs.append(SearchDir(dirpath, recurse=False)) | |
else: | |
raise | |
return dirs | |
def search_dir(target, push_every=100, find_keys=None): | |
"""Search all paths inside target""" | |
if find_keys is not None: | |
dir_iter = target.search(find_keys=find_keys) | |
else: | |
dir_iter = target.search() | |
file_buffer = [] | |
for file_data in dir_iter: | |
file_buffer.append(file_data) | |
if len(file_buffer) >= push_every: | |
search_dir.queue.put(file_buffer) | |
file_buffer = [] | |
if file_buffer: | |
search_dir.queue.put(file_buffer) | |
def pmap(args): | |
return search_dir(*args) | |
def pinit(queue): | |
search_dir.queue = queue | |
def preduce(queue, find_keys): | |
find_keys = find_keys if find_keys is not None else DEFAULT_KEYS | |
print(','.join(find_keys)) | |
while True: | |
file_buffer = queue.get() | |
if file_buffer is None: | |
return | |
for file_data in file_buffer: | |
line = ','.join(str(item) for item in file_data) | |
print(line) | |
def reap_from_path(path, max_recurse=1, nprocesses=None, push_every=100, find_keys=None): | |
dirs = find_searchdirs(path, max_recurse=max_recurse) | |
print('Searching %d directories...' % len(dirs), file=sys.stderr) | |
queue = multiprocessing.Queue() | |
pool = multiprocessing.Pool(processes=nprocesses, initializer=pinit, initargs=(queue,)) | |
procs = pool.imap_unordered( | |
pmap, [ | |
(target, push_every, find_keys) for target in dirs | |
], | |
chunksize=1, | |
) | |
reaper_thread = threading.Thread(target=preduce, args=(queue, find_keys)) | |
reaper_thread.daemon = True | |
reaper_thread.start() | |
for count, _ in enumerate(procs): | |
print('%d/%d' % (count + 1, len(dirs)), file=sys.stderr) | |
queue.put(None) | |
try: | |
reaper_thread.join() | |
except KeyboardInterrupt: | |
pass | |
if __name__ == '__main__': | |
CLI = argparse.ArgumentParser() | |
CLI.add_argument( | |
'--path', | |
default='.', | |
help='Path from which to start searching files' | |
) | |
CLI.add_argument( | |
'-r', | |
'--max-recurse', | |
default=2, | |
type=int, | |
help='Maximum recursion depth for parallelization' | |
) | |
CLI.add_argument( | |
'--max-output', | |
default=float('inf'), | |
type=int, | |
help='Maximum output to produce, in bytes' | |
) | |
options = CLI.parse_args() | |
reap_from_path(path=options.path, max_recurse=options.max_recurse) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment