Last active
February 26, 2024 07:29
-
-
Save journey-ad/14388855f7620755e21ecf239dbc47b1 to your computer and use it in GitHub Desktop.
P站批量下载脚本
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
需管理员权限以创建软连接 | |
需将ffmpeg加入环境变量PATH | |
仅在windows平台进行了测试 | |
''' | |
import os | |
import argparse | |
import math | |
import json | |
import threading | |
import requests | |
import zipfile | |
import subprocess | |
import shutil | |
import tempfile | |
import sqlite3 | |
from contextlib import contextmanager | |
SCRIPT_VERSION = "1.0" | |
API = 'https://api.imjad.cn/pixiv/v1/' | |
CWD = os.getcwd() | |
def get_member_info(member_id): | |
try: | |
conn = sqlite3.connect(os.path.join(CWD, 'pixiv.db')) | |
cursor = conn.cursor() | |
cmd=''' | |
SELECT profile FROM member_id2profile WHERE member_id=?; | |
''' | |
cursor.execute(cmd, (member_id,)) | |
values = cursor.fetchone() | |
if values: | |
resp = json.loads(values[0]) | |
else: | |
params = { | |
'type': 'member', | |
'id': member_id | |
} | |
r = requests.get(API, params=params, timeout=20) | |
resp = json.loads(r.text) | |
if resp['status'] == 'success': | |
cmd=''' | |
INSERT INTO member_id2profile (member_id, profile) | |
VALUES (?, ?); | |
''' | |
cursor.execute(cmd,(member_id, r.text)) | |
else: | |
print(resp['errors']['system']['message']) | |
cursor.close() | |
conn.commit() | |
conn.close() | |
return resp | |
except Exception as err: | |
raise err | |
def get_img_url(illust_id, img_type='large'): | |
try: | |
conn = sqlite3.connect(os.path.join(CWD, 'pixiv.db')) | |
cursor = conn.cursor() | |
cmd=''' | |
SELECT illust FROM illust_id2illust WHERE illust_id=?; | |
''' | |
cursor.execute(cmd, (illust_id,)) | |
values = cursor.fetchone() | |
if values: | |
resp = json.loads(values[0]) | |
else: | |
params = { | |
'type': 'illust', | |
'id': illust_id | |
} | |
r = requests.get(API, params=params, timeout=20) | |
resp = json.loads(r.text) | |
if resp['status'] == 'success': | |
cmd=''' | |
INSERT INTO illust_id2illust (illust_id, illust) | |
VALUES (?, ?); | |
''' | |
cursor.execute(cmd,(illust_id, r.text)) | |
else: | |
print(resp['errors']['system']['message']) | |
return None,None | |
page_list = [] | |
frames = [] | |
if not resp['response'][0]['metadata']: | |
page_list.append(resp['response'][0]['image_urls'][img_type]) | |
elif 'zip_urls' in resp['response'][0]['metadata']: | |
frames = resp['response'][0]['metadata']['frames'] | |
page_list.append(resp['response'][0]['metadata']['zip_urls']['ugoira1920x1080']) | |
else: | |
resp = resp['response'][0]['metadata']['pages'] | |
for page in resp: | |
page_list.append(page['image_urls'][img_type]) | |
cursor.close() | |
conn.commit() | |
conn.close() | |
return page_list,frames | |
except Exception as err: | |
print(illust_id) | |
raise err | |
def get_user_fav(user_id): | |
try: | |
params = { | |
'type': 'favorite', | |
'id': user_id, | |
'page': 1, | |
'per_page': 1000 | |
} | |
r = requests.get(API, params=params, timeout=20) | |
resp = json.loads(r.text) | |
illust_id_list = [] | |
pages = int(resp['pagination']['pages']) | |
print('共 %s 页,正在处理第 1 页' % (pages)) | |
for illust in resp['response']: | |
illust_id_list.append(illust['work']['id']) | |
if pages > 1: | |
for x in range(2, pages+1): | |
params['page'] = x | |
r = requests.get(API, params=params, timeout=20) | |
resp = json.loads(r.text) | |
print('共 %s 页,正在处理第 %s 页' % (pages, x)) | |
for illust in resp['response']: | |
illust_id_list.append(illust['work']['id']) | |
return illust_id_list | |
except Exception: | |
print('获取收藏列表失败,正在重试…') | |
return get_user_fav(user_id) | |
def get_user_illust(user_id, fav_limit=0): | |
try: | |
params = { | |
'type': 'member_illust', | |
'id': user_id, | |
'page': 1, | |
'per_page': 1000 | |
} | |
r = requests.get(API, params=params, timeout=20) | |
resp = json.loads(r.text) | |
illust_id_list = [] | |
pages = int(resp['pagination']['pages']) | |
print('共 %s 页,正在处理第 1 页' % (pages)) | |
for illust in resp['response']: | |
fav_counts = illust['stats']['favorited_count'] | |
fav_count = int(fav_counts['public']) + int(fav_counts['private']) | |
if fav_count >= int(fav_limit): | |
illust_id_list.append(illust['id']) | |
else: | |
print('作品 %s 收藏数为 %s, 低于阈值 %s, 忽略' % (illust['id'], fav_count, fav_limit)) | |
if pages > 1: | |
for x in range(2, pages+1): | |
params['page'] = x | |
r = requests.get(API, params=params, timeout=20) | |
resp = json.loads(r.text) | |
print('共 %s 页,正在处理第 %s 页' % (pages, x)) | |
for illust in resp['response']: | |
fav_counts = illust['stats']['favorited_count'] | |
fav_count = int(fav_counts['public']) + int(fav_counts['private']) | |
if fav_count >= int(fav_limit): | |
illust_id_list.append(illust['id']) | |
else: | |
print('作品 %s 收藏数为 %s, 低于阈值 %s, 忽略' % (illust['id'], fav_count, fav_limit)) | |
return illust_id_list | |
except Exception: | |
print('获取作品列表失败,正在重试…') | |
return get_user_illust(user_id, fav_limit) | |
def get_search_list(word, fav_limit=0): | |
if word == None: | |
print('请输入关键词') | |
exit() | |
try: | |
params = { | |
'type': 'search', | |
'mode': 'tag', | |
'word': word, | |
'per_page': 1000 | |
} | |
r = requests.get(API, params=params, timeout=20) | |
resp = json.loads(r.text) | |
illust_id_list = [] | |
pages = int(resp['pagination']['pages']) | |
print('共 %s 页,正在处理第 1 页' % (pages)) | |
for illust in resp['response']: | |
fav_counts = illust['stats']['favorited_count'] | |
fav_count = int(fav_counts['public']) + int(fav_counts['private']) | |
if fav_count >= int(fav_limit): | |
illust_id_list.append(illust['id']) | |
# else: | |
# print('作品 %s 收藏数为 %s, 低于阈值 %s, 忽略' % (illust['id'], fav_count, fav_limit)) | |
if pages > 1: | |
for x in range(2, pages+1): | |
params['page'] = x | |
r = requests.get(API, params=params, timeout=20) | |
resp = json.loads(r.text) | |
print('共 %s 页,正在处理第 %s 页' % (pages, x)) | |
for illust in resp['response']: | |
fav_counts = illust['stats']['favorited_count'] | |
fav_count = int(fav_counts['public']) + int(fav_counts['private']) | |
if fav_count >= int(fav_limit): | |
illust_id_list.append(illust['id']) | |
# else: | |
# print('作品 %s 收藏数为 %s, 低于阈值 %s, 忽略' % (illust['id'], fav_count, fav_limit)) | |
return illust_id_list | |
except Exception: | |
print('获取搜索结果列表失败,正在重试…') | |
return get_search_list(word, fav_limit) | |
def get_file_id_list(path): | |
allfile=set() | |
zerofile=set() | |
for dirpath,dirnames,filenames in os.walk(path): | |
for name in filenames: | |
ext = name.split('.')[-1] | |
if ext == 'jpg' or ext == 'png' or ext == 'gif': | |
if os.path.getsize(os.path.join(dirpath,name)) > 0: | |
allfile.add(int(name.split('_')[0])) | |
else: | |
zerofile.add(int(name.split('_')[0])) | |
if ext == 'zip': | |
name = name.split('.')[0] + '.webm' | |
if os.path.exists(os.path.join(dirpath,name)): | |
allfile.add(int(name.split('_')[0])) | |
allfile = list(allfile-zerofile) | |
return allfile | |
def download(list_, img_type='large', path='.'): | |
HEADERS = { | |
'Referer': 'https://www.pixiv.net' | |
} | |
for illust_id in list_: | |
print('获取作品(%s) 分页列表…' % (illust_id)) | |
page_list, frames = get_img_url(illust_id, img_type) | |
if page_list: | |
for url in page_list: | |
file_name = url.split('/')[-1] | |
file_path = os.path.join(CWD, 'illusts', file_name) | |
link_path = os.path.join(path, file_name) | |
print('下载中,保存至 %s' % (link_path)) | |
try: | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
os.makedirs(os.path.dirname(link_path), exist_ok=True) | |
if file_exists(link_path): | |
print('图片已存在') | |
else: | |
if not file_exists(file_path): | |
with open(file_path, 'wb') as file: | |
file.write(requests.get(url, headers=HEADERS, timeout=30).content) | |
os.symlink(file_path, link_path) | |
if frames: | |
print('发现动图,转换为WEBM…') | |
ugoira2webm(file_path, frames) | |
webm_filename = os.path.basename(file_path).split('.')[0] + ".webm" | |
file_path = os.path.join(CWD, 'illusts', webm_filename) | |
link_path = os.path.join(path, webm_filename) | |
os.symlink(file_path, link_path) | |
except Exception as e: | |
raise e | |
# pass | |
else: | |
print('投稿已被删除(%s),尝试从库中恢复…' % (illust_id)) | |
illusts_path = os.path.join(CWD, 'illusts') | |
if illust_id_exists(illust_id): | |
print('发现备份文件(%s),正在恢复…' % (illust_id)) | |
for file_name in os.listdir(illusts_path): | |
if int(file_name.split('_')[0]) == illust_id: | |
file_path = os.path.join(illusts_path, file_name) | |
link_path = os.path.join(path, file_name) | |
os.symlink(file_path, link_path) | |
def chunks(list_, num): | |
num = int(math.ceil(len(list_) / float(num))) | |
return list([list_[i:i + num] for i in range(0, len(list_), num)]) | |
def get_format_filename(input_filename): | |
for s in ['?', '*', '<', '>', '\\', '!', '/']: | |
while s in input_filename: | |
input_filename = input_filename.strip().replace(s, '') | |
return input_filename | |
def file_exists(path): | |
if os.path.exists(path): | |
if os.path.getsize(path) > 0: | |
return True | |
else: | |
return False | |
else: | |
return False | |
def illust_id_exists(illust_id): | |
file_path = os.path.join(CWD, 'illusts') | |
for dirpath,dirnames,filenames in os.walk(file_path): | |
for name in filenames: | |
if illust_id == int(name.split('_')[0]) and os.path.getsize(os.path.join(dirpath, name)) > 0: | |
return True | |
@contextmanager | |
def cd(newdir): | |
olddir = os.getcwd() | |
os.chdir(os.path.expanduser(newdir)) | |
try: | |
yield | |
finally: | |
os.chdir(olddir) | |
def ugoira2webm(file_path, frames): | |
path = os.path.dirname(file_path) | |
name = '.'.join(file_path.split('.')[:-1]) | |
webm_filename = os.path.basename(name) + ".webm" | |
if not file_exists(os.path.join(path, webm_filename)): | |
with tempfile.TemporaryDirectory(prefix="ugoira2webm") as d: | |
ffconcat = "ffconcat version 1.0\n" | |
with zipfile.ZipFile(file_path) as f: | |
f.extractall(d) | |
with cd(d): | |
for i in frames: | |
ffconcat += "file " + i['file'] + '\n' | |
ffconcat += "duration " + str(i['delay_msec'] / 1000) + '\n' | |
with open("i.ffconcat", "w") as f: | |
f.write(ffconcat) | |
p = os.popen("ffmpeg -n -v quiet -i i.ffconcat -c:v libvpx-vp9 -lossless 1 " + webm_filename) | |
ret = p.close() | |
if ret is not None: | |
exit(ret) | |
shutil.move(os.path.join(d, webm_filename), os.path.join(path, webm_filename)) | |
if __name__ == '__main__': | |
PARSER = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, | |
description='Pixiv 批量下载脚本', | |
epilog='(c) 2017 journey.ad') | |
PARSER.add_argument('MEMBER_ID', type=str, nargs="?", help='用户的 Pixiv ID') | |
PARSER.add_argument('GET_TYPE', type=str, nargs="?", help='获取类型 illust favorite search file') | |
PARSER.add_argument('IMG_TYPE', type=str, nargs="?", help='图片类型') | |
PARSER.add_argument('PATH', type=str, nargs="?", help='保存地址') | |
PARSER.add_argument('FAV_LIMIT', type=str, nargs="?", help='收藏数阈值') | |
PARSER.add_argument("-v", "--version", action="version", | |
version="pixiv-favorite-download-helper {}".format(SCRIPT_VERSION)) | |
ARGS = PARSER.parse_args() | |
try: | |
print('连接数据库…') | |
conn = sqlite3.connect(os.path.join(CWD, 'pixiv.db')) | |
print('数据库连接成功') | |
cursor = conn.cursor() | |
try: | |
cmd=''' | |
CREATE TABLE IF NOT EXISTS illust_id2illust ( | |
illust_id INT PRIMARY KEY | |
NOT NULL, | |
illust TEXT NOT NULL | |
); | |
CREATE TABLE IF NOT EXISTS member_id2profile ( | |
member_id INT PRIMARY KEY | |
NOT NULL, | |
profile TEXT NOT NULL | |
); | |
''' | |
cursor.executescript(cmd) | |
except Exception as err: | |
raise err | |
# print('数据库创建失败,程序退出…') | |
# sys.exit() | |
finally: | |
cursor.close() | |
conn.commit() | |
conn.close() | |
except Exception as err: | |
raise err | |
# print('数据库连接失败,程序退出…') | |
# sys.exit() | |
if ARGS.MEMBER_ID: | |
MEMBER_ID = ARGS.MEMBER_ID | |
else: | |
print('请输入用户的 Pixiv ID') | |
exit() | |
IMG_TYPE = ARGS.IMG_TYPE if ARGS.IMG_TYPE else 'large' | |
GET_TYPE = ARGS.GET_TYPE if ARGS.GET_TYPE else 'illust' | |
if GET_TYPE == 'search': | |
KEY_WORD = MEMBER_ID | |
elif GET_TYPE == 'file': | |
INPUT_FILE = MEMBER_ID | |
else: | |
MEMBER_NAME = get_member_info(MEMBER_ID)['response'][0]['name'] | |
PATH = ARGS.PATH if ARGS.PATH else os.path.join(CWD, '%s(%s)' % (get_format_filename(MEMBER_NAME), MEMBER_ID)) | |
FAV_LIMIT = ARGS.FAV_LIMIT if ARGS.FAV_LIMIT else 0 | |
try: | |
if GET_TYPE == 'illust': | |
PATH = os.path.join(CWD, '%s/%s(%s)' % (PATH, get_format_filename(MEMBER_NAME), MEMBER_ID)) | |
print('获取用户 %s(%s) 的作品列表…' % (MEMBER_NAME, MEMBER_ID)) | |
json_name = '%s/%s_%s.json' % (PATH, get_format_filename(MEMBER_ID), FAV_LIMIT) | |
if os.path.exists(json_name): | |
with open(json_name,'r') as f: | |
LIST = json.load(f) | |
else: | |
LIST = get_user_illust(MEMBER_ID, FAV_LIMIT) | |
elif GET_TYPE == 'favorite': | |
PATH = os.path.join(CWD, '%s/%s(%s)' % (PATH, get_format_filename(MEMBER_NAME), MEMBER_ID)) | |
print('获取用户 %s(%s) 的收藏列表…' % (MEMBER_NAME, MEMBER_ID)) | |
json_name = '%s/%s_%s.json' % (PATH, get_format_filename(MEMBER_ID), FAV_LIMIT) | |
LIST = get_user_fav(MEMBER_ID) | |
elif GET_TYPE == 'search': | |
PATH = os.path.join(CWD, '%s/%s' % (PATH, get_format_filename(KEY_WORD))) | |
print('获取关键词 %s 的结果列表…' % KEY_WORD) | |
json_name = '%s/%s_%s.json' % (PATH, get_format_filename(KEY_WORD), FAV_LIMIT) | |
if os.path.exists(json_name): | |
with open(json_name,'r') as f: | |
LIST = json.load(f) | |
else: | |
LIST = get_search_list(KEY_WORD, FAV_LIMIT) | |
elif GET_TYPE == 'file': | |
print('从 %s 获取下载列表…' % INPUT_FILE) | |
json_name = '%s/list_%s.json' % (PATH, FAV_LIMIT) | |
with open(INPUT_FILE,'r') as f: | |
LIST = json.load(f) | |
os.makedirs(os.path.dirname(json_name), exist_ok=True) | |
with open(json_name, 'w') as f: | |
json.dump(LIST, f) | |
EXIST_LIST = get_file_id_list(PATH) | |
print('获取成功, 共计 %s 件作品, 其中 %s 件已存在, 将自动跳过' % (len(LIST), len(EXIST_LIST))) | |
LIST = list(set(LIST) - set(EXIST_LIST)) | |
if LIST: | |
LIST = chunks(LIST, 5) | |
else: | |
print('所有图像都已被下载') | |
os.sys.exit() | |
JOBS = [] | |
for item in LIST: | |
JOBS.append(threading.Thread(target=download, args=(item, IMG_TYPE, PATH))) | |
for job in JOBS: | |
job.start() | |
for job in JOBS: | |
job.join() | |
except Exception as err: | |
raise err | |
finally: | |
conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment