Skip to content

Instantly share code, notes, and snippets.

@smuthali
Created August 6, 2015 05:11
Show Gist options
  • Save smuthali/202f64992ad00c89f07e to your computer and use it in GitHub Desktop.
Save smuthali/202f64992ad00c89f07e to your computer and use it in GitHub Desktop.
Detect lock files in a given directory path (support for subdirectories included)
#!/usr/bin/env python
import os
import sys
import fcntl
import argparse
import logging
import psutil
# Set logging options
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def check_dir_path(directory_path):
"""
:rtype : object
:param directory_path:
:return: directory path/string if exists else None
"""
return os.path.isdir(directory_path)
def list_filepaths(directory_path):
"""
:rtype : object
:param directory_path:
:return: array consisting of all file paths including subdirectories else None
"""
file_array = []
for dir, subdir, files in os.walk(directory_path):
for filename in files:
filepath = os.path.join(dir, filename)
file_array.append(filepath)
return file_array
def list_pid_files(files):
"""
:type files: object
"""
# Iterate through the array and check if file is locked \
# if locked, extract the PID and construct a JSON with PID and \
# corresponding filename
pid_filename = []
for item in files:
try:
file_object = open(item, "w+")
fd = fcntl.lockf(file_object, fcntl.LOCK_EX | fcntl.LOCK_NB)
logger.debug("fd:%s" % file_object)
except IOError, e:
# Obtain the corresponding PID
pid_of_lockfile = psutil.Process(os.getpid())
pid_filename.append({'file_name': "%s" % item, 'pid': "%s" % pid_of_lockfile})
return pid_filename
def main():
"""
:return: list
"""
parser = argparse.ArgumentParser(prog='./lslock.py', usage='%(prog)s [options]')
parser.add_argument('--dir_path', '-d', required=True, type=str, nargs=1, help="absolute directory path;"
" example: /home/iamnuts")
total = len(sys.argv)
cmdargs = str(sys.argv)
logger.debug("total args:%s" % total)
logger.debug("total cmd args:%s" % cmdargs)
if len(sys.argv) != 3:
parser.print_help()
sys.exit(1)
args = vars(parser.parse_args())
logger.debug("args=%s" % args)
# pop the args
global path
path = args.pop('dir_path')
directory_path = str(path).strip('[]').strip("''")
logger.debug("Directory path: %s" % directory_path)
# Call the appropriate functions
is_path_valid = check_dir_path(directory_path)
if not is_path_valid:
logger.error("User provided directory path is not valid")
sys.exit(-1)
list_path = list_filepaths(directory_path)
if not list_path:
logger.error("No files found in: %s" % directory_path)
sys.exit(-1)
logger.debug("list of all files in:%s" % directory_path + "%s" % list_path)
# Print found lock files along with their PID
pid_with_filename = list_pid_files(list_path)
if not pid_with_filename:
logger.info("No lock files found in %s" % directory_path)
sys.exit()
logger.info("lock file(s) with the corresponding PID: \n\n %s" % pid_with_filename)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment