Created
May 29, 2017 11:09
-
-
Save aravindavk/ac03c02c219df4177def81301f9917e3 to your computer and use it in GitHub Desktop.
Crawl and Worker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Popen find command and start a thread to distribute the load | |
from errno import ENOENT | |
import logging | |
import os | |
import subprocess | |
import sys | |
from threading import Thread, RLock | |
import time | |
outfile = None | |
crawl_ended = False | |
BATCH_SIZE = 10 | |
NUM_WORKERS = 4 | |
OUTPUT_FILE = "./find_output.txt" | |
logger = logging.getLogger(__name__) | |
def _log(typ, desc, **kwargs): | |
msg = desc | |
values = ["{0}={1}".format(k, v) for k, v in kwargs.items()] | |
if values: | |
msg += "\t" + " ".join(values) | |
if typ == "INFO": | |
logger.info(msg) | |
else: | |
logger.debug(msg) | |
def log_info(desc, **kwargs): | |
_log("INFO", desc, **kwargs) | |
def log_debug(desc, **kwargs): | |
_log("DEBUG", desc, **kwargs) | |
def setup_logger(): | |
""" | |
Logging initialization, Log level by default will be INFO, once config | |
file is read, respective log_level will be set. | |
""" | |
global logger | |
logger.setLevel(logging.DEBUG) | |
# create the logging file handler | |
# fh = logging.FileHandler("test.log") | |
fh = logging.StreamHandler() | |
formatter = logging.Formatter("[%(asctime)s] %(levelname)s " | |
"[%(module)s - %(lineno)s:%(funcName)s] " | |
"- %(message)s") | |
fh.setFormatter(formatter) | |
# add handler to logger object | |
logger.addHandler(fh) | |
def find_process(path, output_file): | |
cmd = ["find", path, | |
"-type", "f", | |
"!", "-path", os.path.join(path, ".glusterfs"), | |
"!", "-path", os.path.join(path, ".glusterfs/*"), | |
"!", "-path", os.path.join(path, ".shard"), | |
"!", "-path", os.path.join(path, ".shard/*"), | |
"!", "-path", os.path.join(path, ".trashcan"), | |
"!", "-path", os.path.join(path, ".trashcan/*"), | |
"-fprintf", output_file, '%P\n'] | |
return subprocess.Popen(cmd, stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
def worker(worker_id, output_file): | |
lock = RLock() | |
while True: | |
files = [] | |
done_flag = False | |
# This lock helps that other workers won't read the | |
# output file while collecting list of files for this | |
# worker | |
with lock: | |
if outfile is not None: | |
for i in range(BATCH_SIZE): | |
line = outfile.readline().strip("\n") | |
if line: | |
files.append(line) | |
if not line and crawl_ended: | |
done_flag = True | |
# Process the list of files collected in this batch | |
for f in files: | |
log_debug("Processed job", path=f, worker=worker_id) | |
if done_flag: | |
return | |
time.sleep(1) | |
def main(path): | |
global outfile, crawl_ended | |
setup_logger() | |
# current directory is working directory. Find command's output | |
# file is "./find_output.txt" | |
# To be on safer side, delete the output file if exists | |
try: | |
os.remove(OUTPUT_FILE) | |
except OSError as e: | |
if e.errno != ENOENT: | |
logger.error("Unable to delete the output file " | |
"\"{0}\": {1}".format(OUTPUT_FILE, e)) | |
sys.exit(1) | |
p = find_process(path, OUTPUT_FILE) | |
# Wait for Find process to become stable | |
time.sleep(1) | |
# If Status of Find is None then it is running, zero means completed | |
# and success. Proceed only if None or zero, else exit with Error | |
st = p.poll() | |
if st is None or st == 0: | |
outfile = open(OUTPUT_FILE) | |
else: | |
_, err = p.communicate() | |
log_info("Crawler exited", status=st, error=err) | |
sys.exit(1) | |
workers = [] | |
for i in range(NUM_WORKERS): | |
wt = Thread(target=worker, args=("w{0}".format(i), OUTPUT_FILE)) | |
wt.start() | |
workers.append(wt) | |
find_process_status = p.poll() | |
while find_process_status is None: | |
# Crawling is not complete yet, wait for sometime | |
time.sleep(1) | |
find_process_status = p.poll() | |
# Subprocess is now completed its crawling, now watch | |
# for all threads to complete and finish | |
crawl_ended = True | |
_, err = p.communicate() | |
if find_process_status == 0: | |
log_info("Crawling is completed now waiting for " | |
"workers to complete") | |
else: | |
log_info("Crawling exited now waiting for " | |
"workers to complete", | |
return_code=find_process_status, | |
error=err) | |
for w in workers: | |
w.join() | |
log_info("All Workers done! Exiting...") | |
if __name__ == "__main__": | |
main(sys.argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment