Skip to content

Instantly share code, notes, and snippets.

@datavudeja
Forked from DocDilbert/pload.py
Created February 5, 2025 20:09
Show Gist options
  • Save datavudeja/89e76cd0d2b77b29f62a46b5cc921540 to your computer and use it in GitHub Desktop.
Save datavudeja/89e76cd0d2b77b29f62a46b5cc921540 to your computer and use it in GitHub Desktop.
Plugin based file operations
# coding=utf-8
import datetime
import os
import yaml
import click
import concurrent.futures
from tqdm import tqdm
import fnmatch
import re
import pathlib
class Plugin:
def invoke(self, path: pathlib.PosixPath, queue: list):
pass
class PluginLoadContent(Plugin):
def __init__(self, output: str) -> None:
super().__init__()
self.__output = output
def invoke(self, path: pathlib.PosixPath, data_dic: list):
with open(path, "r", encoding="utf-8") as fp:
inbuf = fp.read()
data_dic[self.__output] = inbuf
class PluginGetFileStamp(Plugin):
def invoke(self, path: pathlib.PosixPath, data_dic: list):
stats = path.stat()
data_dic["MODIFIED_TIME"] = datetime.datetime.fromtimestamp(stats.st_mtime)
data_dic["CREATE_TIME"] = datetime.datetime.fromtimestamp(stats.st_ctime)
class PluginQueueRemoveEntry(Plugin):
def __init__(self, input: str) -> None:
super().__init__()
self.__input = input
def invoke(self, path: pathlib.PosixPath, data_dic: list):
if self.__input in data_dic:
data_dic.pop(self.__input)
class PluginFilter(Plugin):
def invoke(self, path: pathlib.PosixPath, data_dic: list):
if "REGEX" not in data_dic:
data_dic.clear()
class PluginRegex(Plugin):
def __init__(self, input, regex, output):
self.__input = input
self.__regex = re.compile(regex)
self.__output = output
def invoke(self, path: pathlib.PosixPath, data_dic: list[Plugin]):
element = data_dic[self.__input]
res = self.__regex.findall(element)
if res:
data_dic[self.__output] = res
def process_file(raw: tuple[pathlib.PosixPath, list[Plugin]]):
path, plugins = raw
data_dic = dict()
[plugin.invoke(path, data_dic) for plugin in plugins]
return data_dic
def print_data(data_dic):
for k, v in data_dic.items():
print(f" {k} : {v}")
@click.command()
@click.option("--count", default=1, help="Number of greetings.")
def main(count):
config = None
with open("config.yml", "r") as config:
config = yaml.safe_load(config)
include_re = [re.compile(fnmatch.translate(i)) for i in config["INCLUDE"]]
exclude_re = [re.compile(fnmatch.translate(i)) for i in config["EXCLUDE"]]
def is_file_eligible(path):
return any(inc.match(path) for inc in include_re) and not any(
exc.match(path) for exc in exclude_re
)
path_list = list()
for root, _dirs, files in os.walk("../omega/", topdown=False):
pathes = (root + "/" + i for i in files)
eligible_pathes = (pathlib.PosixPath(i) for i in pathes if is_file_eligible(i))
path_list += eligible_pathes
plugins = []
for i in config["PLUGINS"]:
plugin = globals()[i["name"]]
args = dict() if "args" not in i else i["args"]
plugins.append(plugin(**args))
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(
tqdm(
executor.map(process_file, ((i, plugins) for i in path_list)),
total=len(path_list),
leave=True,
)
)
result = dict()
for k, v in zip(path_list, results):
if len(v) > 0:
result[k] = v
# for k, v in result.items():
# print(f"# File: {str(k)}")
# print_data(v)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment