Finds an image on danbooru, writes tags as IPTC keywords, than places the image in the library
Last active
July 29, 2023 15:00
-
-
Save aNNiMON/6ba37e4d4084e858f917e271550ce5f6 to your computer and use it in GitHub Desktop.
PicSorter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__pycache__ | |
.idea | |
input | |
library | |
logs | |
images.db |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
import yaml | |
class Config: | |
@staticmethod | |
def load(path='config.yml'): | |
with open(path, 'rt', encoding='utf8') as f: | |
config = yaml.load(f.read(), Loader=yaml.FullLoader) | |
return Config(config) | |
def __init__(self, config): | |
dirs = config.get('dirs', {}) | |
self.dir_tmp = Path(dirs.get('tmp', '/tmp/')) | |
self.dir_processed = Path(dirs.get('processed', './processed')) | |
self.dir_logs = Path(dirs.get('logs', './logs')) | |
self.dir_library = Path(dirs.get('library', './library')) | |
self.__setup_folders() | |
def __setup_folders(self): | |
self.dir_tmp.mkdir(exist_ok=True) | |
self.dir_logs.mkdir(exist_ok=True) | |
self.dir_library.mkdir(exist_ok=True) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dirs: | |
tmp: /tmp/ | |
processed: ./processed | |
logs: ./logs | |
library: ./library | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import requests | |
import time | |
files = [f for f in os.listdir('.') if os.path.isfile(f)] | |
for f in files: | |
m = re.search('.*__(?:sample-)?(\w+)*.', f) | |
if not m: | |
print('Warning: ', f) | |
continue | |
md5 = m.group(1) | |
try: | |
data = requests.get('https://danbooru.donmai.us/posts.json?tags=md5%3A' + md5).json() | |
time.sleep(1) | |
if len(data) == 1 and "id" in data[0]: | |
print("https://danbooru.donmai.us/posts/" + str(data[0]['id'])) | |
else: | |
print(md5) | |
except Exception as ex: | |
print(md5) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
from datetime import datetime | |
class Database: | |
def __init__(self): | |
self.db_name = 'images.db' | |
self.__create_tables() | |
def __create_tables(self): | |
conn = sqlite3.connect(self.db_name) | |
c = conn.cursor() | |
c.executescript(""" | |
CREATE TABLE IF NOT EXISTS images ( | |
id INTEGER PRIMARY KEY NOT NULL, | |
provider TEXT NOT NULL, | |
tags TEXT NOT NULL, | |
created_at TIMESTAMP, | |
UNIQUE(id, provider) ON CONFLICT REPLACE | |
) | |
""") | |
conn.commit() | |
conn.close() | |
def is_exists(self, provider, _id) -> bool: | |
conn = sqlite3.connect(self.db_name) | |
c = conn.cursor() | |
c.execute("SELECT EXISTS(SELECT 1 FROM images WHERE id=? AND provider=?)", (_id, provider)) | |
result = c.fetchone()[0] | |
conn.close() | |
return bool(result) | |
def add(self, _id, provider, tags): | |
conn = sqlite3.connect(self.db_name) | |
c = conn.cursor() | |
sql = 'INSERT INTO images(id, provider, tags, created_at) VALUES (?,?,?,?)' | |
c.execute(sql, (_id, provider, tags, datetime.now())) | |
conn.commit() | |
conn.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from typing import Optional | |
import requests | |
from bs4 import BeautifulSoup | |
class Iqdb: | |
@staticmethod | |
def search(file: str) -> Optional[str]: | |
logging.info('Searching %s', file) | |
files = {'file': open(file, 'rb')} | |
resp = requests.post('https://iqdb.org/', files=files, timeout=10) | |
doc = BeautifulSoup(resp.text, 'html.parser') | |
for tag in doc.select(".image a"): | |
url = tag.get("href") | |
if "danbooru.donmai.us/posts" in url: | |
if url.startswith("//"): | |
url = "https:" + url | |
return url | |
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import shutil | |
from pathlib import Path | |
from tags import Tags | |
class Library: | |
def __init__(self, dir_root: Path): | |
self.dir_root = dir_root | |
self.dir_orphan = Path(dir_root, '_orphan') | |
self.dir_orphan.mkdir(exist_ok=True, parents=True) | |
def move_to_orphan(self, p: Path) -> None: | |
logging.info("%s move to orphan", p) | |
shutil.move(os.fspath(p), os.fspath(self.dir_orphan)) | |
def move(self, p: Path, tags: Tags) -> str: | |
new_path = self.__compute_path(tags) | |
new_path.mkdir(exist_ok=True, parents=True) | |
logging.info("%s move to %s", p.name, new_path) | |
shutil.move(os.fspath(p), os.fspath(new_path)) | |
return str(new_path).replace("\\", "/") + "/" + p.name | |
def __compute_path(self, tags: Tags) -> Path: | |
p = self.dir_root | |
if tags.copyrights == 'original': | |
# Originals groups by artist | |
p = p / "_originals" | |
if tags.artists != "": | |
artist = tags.artists.split(" ")[0] | |
artist = self.__sanitize(artist) | |
if (p / artist).exists(): | |
p = p / artist | |
return p | |
# Main section | |
if tags.copyrights != "": | |
_copyright = tags.copyrights.split(" ")[0] | |
p = p / self.__sanitize(_copyright) | |
if tags.characters == "": | |
return p | |
# Characters section | |
characters = tags.characters_sanitized() | |
num = len(characters) | |
if num == 1: | |
p = p / self.__sanitize(characters[0]) | |
elif num == 2 and characters[0] in characters[1]: | |
p = p / self.__sanitize(characters[0]) | |
elif num == 2 and characters[1] in characters[0]: | |
p = p / self.__sanitize(characters[1]) | |
else: | |
p = p / "_multiple" | |
return p | |
@staticmethod | |
def __sanitize(s: str) -> str: | |
s = "".join(x for x in s if x.isalnum() or x in "._-()") | |
return s.replace("_", " ").strip() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import re | |
import subprocess | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Optional | |
import fluentpy as _ | |
import requests | |
from tags import Tags | |
class Metadata: | |
def __init__(self, dir_tmp: Path): | |
self.dir_tmp = dir_tmp | |
self.tmp_image_file = Path(self.dir_tmp, "tmp.jpg") | |
self.tmp_fallback_download_file = Path(self.dir_tmp, "dl.jpg") | |
def process(self, url: str) -> Optional[tuple[Path, Tags]]: | |
logging.info("Retrieving metadata for %s", url) | |
meta = self.__get_metadata(url) | |
status = self.__download_file(meta) | |
if not status: | |
logging.warning("Download failed") | |
return None | |
return self.__write_tags(url, meta) | |
@staticmethod | |
def __get_metadata(url: str) -> dict: | |
return requests.get(url + ".json").json() | |
def __download_file(self, r: dict) -> bool: | |
ext = r.get("file_ext", "") | |
w = int(r.get("image_width", "0")) | |
h = int(r.get("image_height", "0")) | |
if (ext not in ["jpg", "jpeg", "png", "webp"]) or w == 0 or h == 0: | |
logging.warning("Skipping due to unsupported extension: %s", ext) | |
print("\033[93mSkipping due to unsupported extension:", ext, "\033[0m") | |
return False | |
file_url = r.get("file_url") | |
if file_url is None: | |
logging.warning("Skipping due to an empty file url") | |
print("\033[93mSkipping due to an empty file url\033[0m") | |
return False | |
file_size_kb = int(r.get('file_size', "0")) / 1024 | |
logging.info("Downloading image") | |
recompress = self.__need_recompress(ext, w, h, file_size_kb) | |
return self.__download(file_url, recompress=recompress) | |
@staticmethod | |
def __need_recompress(ext, w, h, size_kb) -> bool: | |
return ext == 'jpg' and size_kb > 1400 and w < 2500 and h < 2500 | |
def __download(self, img_url: str, recompress: bool = False) -> bool: | |
opt_args = [] | |
if recompress: | |
opt_args = ['-quality', "80"] | |
ret = subprocess.call([ | |
'magick', img_url, | |
'-resize', '2500x2500>', | |
*opt_args, self.tmp_image_file | |
], stdout=subprocess.PIPE) | |
return ret == 0 | |
# noinspection PyCallingNonCallable | |
# noinspection PyProtectedMember | |
def __write_tags(self, url: str, r: dict) -> tuple[Path, Tags]: | |
tag_general = r.get('tag_string_general', "") | |
tag_copyrights = r.get('tag_string_copyright', "") | |
tag_characters = r.get('tag_string_character', "") | |
tag_artists = r.get('tag_string_artist', "") | |
tags = Tags(tag_general, tag_copyrights, tag_characters, tag_artists) | |
tags_file = Path(self.dir_tmp, "tags.txt") | |
with open(tags_file, "w") as f: | |
content = _(tags.tags) \ | |
.map(lambda s: "-IPTC:keywords=" + s) \ | |
.join("\n") \ | |
._ | |
content += "\n-Exif:ImageDescription=" + url | |
content += "\n-Iptc:Caption-Abstract=" + url | |
content += "\n-Xmp:Description=" + url | |
f.write(content) | |
logging.info("Writing tags") | |
subprocess.call([ | |
'exiftool', '-q', '-overwrite_original', | |
'-@', tags_file, | |
self.tmp_image_file | |
], stdout=subprocess.PIPE) | |
filename = self.__format_filename(tags) | |
result_file = Path(self.tmp_image_file.parent, filename) | |
self.tmp_image_file.rename(result_file) | |
return result_file, tags | |
@staticmethod | |
def __format_filename(tags: Tags): | |
filename = '{} {} by {} at {}.jpg'.format( | |
tags.copyrights.split(" ")[0] or "", | |
", ".join(tags.characters_sanitized()[:2]), | |
tags.artists.split(" ")[0] or "", | |
datetime.now().strftime('%Y%m%d_%H%M%S') | |
) | |
filename = "".join(x for x in filename if x.isalnum() or x in " ._-()") | |
return re.sub(r'\s+', ' ', filename).strip() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
import os | |
import re | |
import shutil | |
import time | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Optional | |
from config import Config | |
from database import Database | |
from iqdb import Iqdb | |
from library import Library | |
from metadata import Metadata | |
class PicSorter: | |
@staticmethod | |
def parse_args(): | |
parser = argparse.ArgumentParser( | |
description='Finds an image on danbooru, writes tags as IPTC keywords, than places the image in the library' | |
) | |
parser.add_argument('-c', '--config', | |
type=Path, | |
default='config.yml', | |
help='config.yml file path') | |
parser.add_argument('input', nargs=argparse.REMAINDER) | |
args = parser.parse_args() | |
if len(args.input) >= 1: | |
PicSorter(args.config).process(args.input) | |
def __init__(self, config_file='config.yml'): | |
config = Config.load(config_file) | |
self.config = config | |
self.__setup_logging(config.dir_logs) | |
self.library = Library(config.dir_library) | |
self.metadata = Metadata(config.dir_tmp) | |
self.db = Database() | |
@staticmethod | |
def __setup_logging(dir_logs: Path): | |
filename = datetime.now().strftime('%Y-%m-%d.log') | |
logfile = Path(dir_logs, filename) | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s %(levelname)s %(module)s: %(message)s', | |
datefmt='%H:%M:%S', | |
handlers=[ | |
logging.FileHandler(os.fspath(logfile)) | |
] | |
) | |
def process(self, inputs: list[str]) -> None: | |
for input in inputs: | |
if input.startswith("http") or re.search(r"(\d{3,})", input): | |
print("Processing url", input) | |
self.__process_url(input) | |
else: | |
p = Path(input) | |
if p.is_dir(): | |
self.__process_folder(p) | |
elif p.is_file(): | |
print("Processing file", input) | |
self.__process_file(input) | |
def __process_folder(self, dir_input: Path) -> None: | |
files = {p for p in dir_input.iterdir() | |
if p.suffix in [".jpg", ".png"]} | |
for filename in files: | |
print("Processing", filename) | |
try: | |
self.__process_file(filename) | |
except Exception as ex: | |
raise ex | |
time.sleep(5) | |
def __process_file(self, filename: str) -> bool: | |
url = self.__search_iqdb(filename) | |
if url is None: | |
return False | |
if self.__process_url(url): | |
self.config.dir_processed.mkdir(exist_ok=True, parents=True) | |
from_path = os.fspath(filename) | |
to_path = os.fspath(self.config.dir_processed) | |
shutil.move(from_path, to_path) | |
self.__show_path(to_path) | |
return True | |
return False | |
def __search_iqdb(self, filename: str) -> Optional[str]: | |
url = Iqdb.search(filename) | |
if url is None: | |
logging.warning("%s not found", filename) | |
self.library.move_to_orphan(Path(filename)) | |
return None | |
return url | |
def __process_url(self, url: str) -> bool: | |
m = re.search(r"(https://((?:dan|ai)booru|yande).*?(?:post(?:s|/show)/)?(\d{3,}))", url) | |
if not m: | |
return False | |
provider = m.group(2) | |
post_id = int(m.group(3)) | |
if provider not in ['danbooru', 'aibooru']: | |
return False | |
if self.db.is_exists(provider, post_id): | |
logging.info("Skipping exists post %s %d", provider, post_id) | |
return False | |
meta_result = self.metadata.process(m.group(1)) | |
if meta_result is None: | |
return False | |
image_path, tags = meta_result | |
to_path = self.library.move(image_path, tags) | |
self.db.add(post_id, provider, tags.tags_string) | |
self.__show_path(to_path) | |
return True | |
def __show_path(self, p: str) -> None: | |
print("\033[92mSaved to", 'file://' + p.replace(' ', '%20'), "\033[0m") | |
if __name__ == '__main__': | |
PicSorter.parse_args() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
beautifulsoup4==4.9.3 | |
fluentpy>=2.0 | |
PyYAML==5.4.1 | |
requests>=2.24 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment