Last active
November 11, 2022 22:45
-
-
Save zeratax/3480905a65a4fb0e52c673111933f38c to your computer and use it in GitHub Desktop.
archive your sadpanda favs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# usage: sadfavs.py [-h] [-f FILE] -u USERNAME -p PASSWORD [-d] [--port PORT] | |
# [-a ADDRESS] [-n NUMBER] | |
# download sadpanda galleries with tags. archives all your favorites or all | |
# links (seperated by a newline) from a file. saves tags and general info for | |
# every gallery as a json file. to download galleries you need to run | |
# transmission with remote control enabled. | |
# optional arguments: | |
# -h, --help show this help message and exit | |
# -f FILE, --file FILE archive galleries from file | |
# -u USERNAME, --username USERNAME | |
# your sadpanda username | |
# -p PASSWORD, --password PASSWORD | |
# your sadpanda password | |
# -d, --download download images | |
# --port PORT transmission remote control port | |
# -a ADDRESS, --address ADDRESS | |
# transmission remote control address | |
# -n NUMBER, --number NUMBER | |
# number of threads | |
# requirements | |
# beautifulsoup4==4.8.0 | |
# decorator==4.4.0 | |
# html5lib==1.0.1 | |
# joblib==0.13.2 | |
# lxml==4.3.4 | |
# mechanize==0.4.2 | |
# six==1.12.0 | |
# soupsieve==1.9.2 | |
# urllib3==1.25.3 | |
# webencodings==0.5.1 | |
import mechanize | |
from bs4 import BeautifulSoup | |
import urllib3 | |
from http import cookiejar | |
import pathlib | |
import json | |
# import multiprocessing | |
from multiprocessing.pool import ThreadPool | |
import argparse | |
from functools import partial, wraps | |
import re | |
import time | |
import logging | |
import transmissionrpc | |
transmission = None | |
DELAY = 2 | |
DOMAIN = 'https://e-hentai.org' | |
HEADERS = [('User-agent', | |
'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.1) ' + | |
'Gecko/2008071615 Fedora/3.0.1-1.fc9 Firefox/3.0.1')] | |
BeautifulSoup = partial(BeautifulSoup, features='lxml') | |
logger = logging.getLogger("sadfavs") | |
logger.setLevel(logging.DEBUG) | |
# Create handlers | |
c_handler = logging.StreamHandler() | |
f_handler = logging.FileHandler('sadfavs.log', mode='w') | |
c_handler.setLevel(logging.INFO) | |
f_handler.setLevel(logging.DEBUG) | |
# Create formatters and add it to handlers | |
c_format = logging.Formatter('%(levelname)s - %(message)s') | |
f_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
c_handler.setFormatter(c_format) | |
f_handler.setFormatter(f_format) | |
# Add handlers to the logger | |
logger.addHandler(c_handler) | |
logger.addHandler(f_handler) | |
class DeletedException(Exception): | |
pass | |
def retry(exceptions, tries=4, delay=3, backoff=2, logger=None): | |
""" | |
Retry calling the decorated function using an exponential backoff. | |
http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/ | |
original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry | |
Args: | |
exceptions: The exception to check. may be a tuple of | |
exceptions to check. | |
tries: Number of times to try (not retry) before giving up. | |
delay: Initial delay between retries in seconds. | |
backoff: Backoff multiplier (e.g. value of 2 will double the delay | |
each retry). | |
logger: Logger to use. If None, print. | |
""" | |
def deco_retry(f): | |
@wraps(f) | |
def f_retry(*args, **kwargs): | |
mtries, mdelay = tries, delay | |
while mtries > 1: | |
try: | |
return f(*args, **kwargs) | |
except exceptions as e: | |
msg = '{}, Retrying in {} seconds...'.format(e, mdelay) | |
if logger: | |
logger.warning(msg) | |
else: | |
print(msg) | |
time.sleep(mdelay) | |
mtries -= 1 | |
mdelay *= backoff | |
return f(*args, **kwargs) | |
return f_retry | |
return deco_retry | |
class Favorites: | |
def __init__(self, br, threads, download, file): | |
self.file = file | |
if file: | |
self.pages = 1 | |
else: | |
soup = BeautifulSoup(br.response().read()) | |
last_page = soup.find("table", {"class": "ptt"}).find_all('a')[-2] | |
self.pages = int(last_page.getText()) | |
self.current_page = 0 | |
self.galleries = [] | |
self.threads = threads | |
self.download = download | |
logger.info(f"{self.pages} pages found!") | |
def get_galleries(self): | |
# pool = multiprocessing.Pool(processes=self.cores) | |
pool = ThreadPool(self.threads) | |
while self.current_page <= self.pages: | |
logger.debug(f"looking for galleries on page {self.current_page}") | |
br.open(f"{DOMAIN}/favorites.php?page={self.current_page}") | |
soup = BeautifulSoup(br.response().read()) | |
logger.debug("looking for gids and tokens") | |
if self.file: | |
galleries = open(self.file).readlines() | |
args = list( | |
map(lambda g: (g.split('/')[-3], g.split('/')[-2]), | |
galleries)) | |
else: | |
galleries = soup.find_all("div", {"class": "gl1t"}) | |
args = [] | |
for gallery in galleries: | |
url = gallery.find('a')['href'] | |
id = url.split('/')[-3] | |
token = url.split('/')[-2] | |
args.append((id, token)) | |
logger.debug(f"{len(galleries)} galleries found") | |
self.galleries += pool.starmap(self.get_gallery, args) | |
logger.debug(f"finished {len(self.galleries)} galleries") | |
self.current_page += 1 | |
time.sleep(DELAY * 20) | |
logger.debug(f"next page...") | |
logger.debug(f"all pages finished") | |
pool.close() | |
pool.join() | |
@retry(urllib3.exceptions.HTTPError, logger=logger) | |
def get_gallery(self, id, token): | |
logger.info(f"starting gallery: {id}, {token}") | |
gallery = Gallery(id, token) | |
logger.debug(f"getting info for gallery: {id}, {token}") | |
try: | |
gallery.get_info(br) | |
except DeletedException as err: | |
logger.warning(str(err)) | |
return | |
logger.debug(f"getting torrents for gallery: {id}, {token}") | |
gallery.get_torrents(br) | |
logger.debug(f"saving gallery: {id}, {token}") | |
pathlib.Path(gallery.dir).mkdir(parents=True, exist_ok=True) | |
with open(pathlib.PurePath(gallery.path + ".json"), 'w') as file: | |
json.dump(gallery.get_json(), file) | |
logger.debug(f"downloading gallery: {id}, {token}") | |
if self.download: | |
gallery.download() | |
time.sleep(DELAY) | |
return gallery | |
def write_json(self): | |
logger.debug("writing favorites.json") | |
result = { | |
"galleries": list(map(lambda g: f"{g.path}.json", self.galleries)) | |
} | |
with open("galleries/favorites.json", 'w') as file: | |
json.dump(result, file) | |
class Gallery: | |
def __init__(self, id, token): | |
self.id = id | |
self.token = token | |
self.torrents = [] | |
self.url = f"{DOMAIN}/g/{self.id}/{self.token}/" | |
def get_info(self, br): | |
logger.info(f"getting gallery information from {self.url}") | |
try: | |
br.open(self.url) | |
except mechanize.HTTPError: | |
raise DeletedException( | |
"This gallery has been removed or is unavailable.") | |
soup = BeautifulSoup(br.response().read()) | |
try: | |
table = soup.body.find("div", {"id": "gdd"}).find("table") | |
except AttributeError: | |
raise DeletedException("Offensive Content!") | |
return | |
rows = table.find_all("tr") | |
for row in rows: | |
cols = row.find_all("td") | |
attribute = cols[0].getText() | |
value = cols[1].getText().strip() | |
if attribute.startswith("Posted"): | |
self.date = value | |
elif attribute.startswith("Language"): | |
self.language = value | |
elif attribute.startswith("Length"): | |
self.pages = value | |
self.name = soup.find("h1", id="gn").getText().strip() | |
japanese = soup.find("h1", id="gj") | |
if japanese: | |
self.name_jap = japanese.getText().strip() | |
self.fav_category = soup.find(id="favoritelink").getText().strip() | |
self.category = soup.find("div", {"id": "gdc"}).find( | |
"div").getText().strip() | |
self.uploader = soup.find("div", {"id": "gdn"}).getText().strip() | |
table = soup.body.find("div", {"id": "taglist"}).find("table") | |
rows = table.find_all("tr") | |
self.tags = {} | |
for row in rows: | |
cols = row.find_all("td") | |
attribute = cols[0].getText().strip().replace(':', '') | |
values = cols[1].find_all('a') | |
self.tags[attribute] = list( | |
map(lambda v: v.getText().strip(), values)) | |
self.dir = f"galleries/{self.category}/" | |
if "artist" in self.tags: | |
if len(self.tags["artist"]) > 3: | |
self.artist = "Various" | |
else: | |
self.artist = " & ".join(self.tags["artist"]) | |
else: | |
m = re.search(r"\[.*?\]", self.name) | |
if m: | |
self.artist = m.group() | |
self.artist = self.artist.replace('[', '').replace(']', '') | |
else: | |
self.artist = self.uploader | |
self.dir += self.artist | |
name_safe = re.sub(r'[/\|"?:]', '', self.artist) | |
if len(name_safe) > 150: | |
name_safe = f"{name_safe[150:]}…" | |
self.dir = re.sub(r'[\|"?:]', '', self.dir) | |
self.path = f"{self.dir}/{name_safe}" | |
def get_torrents(self, br): | |
br.open(f"{DOMAIN}/gallerytorrents.php?gid={self.id}&t={self.token}") | |
soup = BeautifulSoup(br.response().read()) | |
tables = soup.body.find_all("table") | |
if not tables: | |
logger.warning(f"no torrents found at {self.url}") | |
for table in tables: | |
self.torrents.append(Torrent(table)) | |
def get_json(self): | |
japanese = None | |
if self.name_jap: | |
japanese = self.name_jap | |
result = { | |
"id": self.id, | |
"token": self.token, | |
"name": self.name, | |
"name_jap": japanese, | |
"url": self.url, | |
"uploader": self.uploader, | |
"date": self.date, | |
"language": self.language, | |
"category": self.category, | |
"fav_cat": self.fav_category, | |
"torrents": list(map(lambda t: t.get_json(), self.torrents)), | |
"tags": self.tags | |
} | |
return result | |
def download(self): | |
if not self.torrents: | |
logger.warning(f"no torrents found at {self.url}") | |
return | |
logger.debug(f"downloading {self.url}") | |
max(self.torrents).download(self.dir) | |
class Torrent: | |
def __init__(self, soup): | |
rows = soup.find_all("tr") | |
cols = rows[0].find_all("td") | |
for col in cols: | |
if not col.find("span"): | |
continue | |
attribute = col.find("span").getText().strip() | |
value = col.getText().strip() | |
if attribute.startswith("Posted"): | |
self.date = value | |
if attribute.startswith("Seeds"): | |
self.seeds = value | |
if attribute.startswith("Size"): | |
self.size = value | |
self.uploader = rows[1].find("td").getText().strip() | |
a = rows[2].find("a") | |
self.name = a.getText().strip() | |
self.url = a['href'].strip() | |
def __eq__(self, other): | |
if not self.seeds or not other.seeds: | |
return NotImplemented | |
return self.seeds == other.seeds | |
def __lt__(self, other): | |
if not self.seeds or not other.seeds: | |
return NotImplemented | |
return self.seeds < other.seeds | |
def get_json(self): | |
result = { | |
"date": self.date, | |
"name": self.name, | |
"url": self.url, | |
"size": self.size | |
} | |
return result | |
def download(self, dir): | |
logger.debug(f"adding torrent {self.url}") | |
dir = pathlib.Path(dir).resolve() | |
try: | |
torrent = transmission.add_torrent(self.url, download_dir=dir) | |
logger.info(f"started downloading: {torrent.name}") | |
except transmissionrpc.error.TransmissionError as err: | |
logger.warning(err) | |
def login_browser(username, password): | |
cj = cookiejar.CookieJar() | |
br = mechanize.Browser() | |
br.set_handle_robots(False) | |
br.set_cookiejar(cj) | |
br.addheaders = HEADERS | |
br.open(f"{DOMAIN}/bounce_login.php?b=d&bt=1-6") | |
br.select_form(nr=0) | |
br.form['UserName'] = username | |
br.form['PassWord'] = password | |
br.submit() | |
return br | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description=""" | |
download sadpanda galleries with tags. | |
archives all your favorites or all links (seperated by a newline) from a file. | |
saves tags and general info for every gallery as a json file.\n | |
to download galleries you need to run transmission with remote control enabled. | |
""") | |
parser.add_argument("-f", "--file", help="archive galleries from file", | |
metavar="FILE") | |
parser.add_argument("-u", "--username", help="your sadpanda username", | |
required=True) | |
parser.add_argument("-p", "--password", help="your sadpanda password", | |
required=True) | |
parser.add_argument("-d", "--download", help="download images", | |
action="store_true") | |
parser.add_argument( | |
"--port", | |
type=int, | |
help="transmission remote control port", | |
default=9091) | |
parser.add_argument( | |
"-a", | |
"--address", | |
help="transmission remote control address", | |
default='localhost') | |
parser.add_argument("-n", "--number", type=int, help="number of threads", | |
default=1) | |
args = parser.parse_args() | |
pathlib.Path("galleries/").mkdir(exist_ok=True) | |
if args.file: | |
br = mechanize.Browser() | |
br.addheaders = HEADERS | |
br.set_handle_robots(False) | |
else: | |
br = login_browser(args.username, args.password) | |
if args.download: | |
transmission = transmissionrpc.Client(args.address, port=args.port) | |
favorites = Favorites(br, args.number, args.download, args.file) | |
favorites.get_galleries() | |
favorites.write_json() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment