Skip to content

Instantly share code, notes, and snippets.

@aravindavk
Created May 29, 2017 11:09
Show Gist options
  • Save aravindavk/ac03c02c219df4177def81301f9917e3 to your computer and use it in GitHub Desktop.
Save aravindavk/ac03c02c219df4177def81301f9917e3 to your computer and use it in GitHub Desktop.
Crawl and Worker
# Popen find command and start a thread to distribute the load
from errno import ENOENT
import logging
import os
import subprocess
import sys
from threading import Thread, RLock
import time
outfile = None
crawl_ended = False
BATCH_SIZE = 10
NUM_WORKERS = 4
OUTPUT_FILE = "./find_output.txt"
logger = logging.getLogger(__name__)
def _log(typ, desc, **kwargs):
msg = desc
values = ["{0}={1}".format(k, v) for k, v in kwargs.items()]
if values:
msg += "\t" + " ".join(values)
if typ == "INFO":
logger.info(msg)
else:
logger.debug(msg)
def log_info(desc, **kwargs):
_log("INFO", desc, **kwargs)
def log_debug(desc, **kwargs):
_log("DEBUG", desc, **kwargs)
def setup_logger():
"""
Logging initialization, Log level by default will be INFO, once config
file is read, respective log_level will be set.
"""
global logger
logger.setLevel(logging.DEBUG)
# create the logging file handler
# fh = logging.FileHandler("test.log")
fh = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s] %(levelname)s "
"[%(module)s - %(lineno)s:%(funcName)s] "
"- %(message)s")
fh.setFormatter(formatter)
# add handler to logger object
logger.addHandler(fh)
def find_process(path, output_file):
cmd = ["find", path,
"-type", "f",
"!", "-path", os.path.join(path, ".glusterfs"),
"!", "-path", os.path.join(path, ".glusterfs/*"),
"!", "-path", os.path.join(path, ".shard"),
"!", "-path", os.path.join(path, ".shard/*"),
"!", "-path", os.path.join(path, ".trashcan"),
"!", "-path", os.path.join(path, ".trashcan/*"),
"-fprintf", output_file, '%P\n']
return subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def worker(worker_id, output_file):
lock = RLock()
while True:
files = []
done_flag = False
# This lock helps that other workers won't read the
# output file while collecting list of files for this
# worker
with lock:
if outfile is not None:
for i in range(BATCH_SIZE):
line = outfile.readline().strip("\n")
if line:
files.append(line)
if not line and crawl_ended:
done_flag = True
# Process the list of files collected in this batch
for f in files:
log_debug("Processed job", path=f, worker=worker_id)
if done_flag:
return
time.sleep(1)
def main(path):
global outfile, crawl_ended
setup_logger()
# current directory is working directory. Find command's output
# file is "./find_output.txt"
# To be on safer side, delete the output file if exists
try:
os.remove(OUTPUT_FILE)
except OSError as e:
if e.errno != ENOENT:
logger.error("Unable to delete the output file "
"\"{0}\": {1}".format(OUTPUT_FILE, e))
sys.exit(1)
p = find_process(path, OUTPUT_FILE)
# Wait for Find process to become stable
time.sleep(1)
# If Status of Find is None then it is running, zero means completed
# and success. Proceed only if None or zero, else exit with Error
st = p.poll()
if st is None or st == 0:
outfile = open(OUTPUT_FILE)
else:
_, err = p.communicate()
log_info("Crawler exited", status=st, error=err)
sys.exit(1)
workers = []
for i in range(NUM_WORKERS):
wt = Thread(target=worker, args=("w{0}".format(i), OUTPUT_FILE))
wt.start()
workers.append(wt)
find_process_status = p.poll()
while find_process_status is None:
# Crawling is not complete yet, wait for sometime
time.sleep(1)
find_process_status = p.poll()
# Subprocess is now completed its crawling, now watch
# for all threads to complete and finish
crawl_ended = True
_, err = p.communicate()
if find_process_status == 0:
log_info("Crawling is completed now waiting for "
"workers to complete")
else:
log_info("Crawling exited now waiting for "
"workers to complete",
return_code=find_process_status,
error=err)
for w in workers:
w.join()
log_info("All Workers done! Exiting...")
if __name__ == "__main__":
main(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment