Last active
October 24, 2023 22:15
-
-
Save CzBiX/76593e7f8f967d96f4e599d49e864a9c to your computer and use it in GitHub Desktop.
Force skip hash check in qBittorrent.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
class BTFailure(Exception): | |
pass | |
def decode_int(x, f): | |
f += 1 | |
new_f = x.index(ord("e"), f) | |
n = int(x[f:new_f]) | |
if x[f] == ord("-"): | |
if x[f + 1] == ord("0"): | |
raise ValueError | |
elif x[f] == ord("0") and new_f != f + 1: | |
raise ValueError | |
return n, new_f + 1 | |
def decode_bytes(x, f): | |
colon = x.index(ord(":"), f) | |
n = int(x[f:colon]) | |
if x[f] == ord("0") and colon != f + 1: | |
raise ValueError | |
colon += 1 | |
return x[colon:colon + n], colon + n | |
def decode_string(x, f): | |
r, l = decode_bytes(x, f) | |
return str(r, "utf-8"), l | |
def decode_list(x, f): | |
r, f = [], f + 1 | |
while x[f] != ord("e"): | |
v, f = decode_func[x[f]](x, f) | |
r.append(v) | |
return r, f + 1 | |
def decode_dict(x, f): | |
r, f = {}, f + 1 | |
while x[f] != ord("e"): | |
k, f = decode_string(x, f) | |
r[k], f = decode_func[x[f]](x, f) | |
return r, f + 1 | |
decode_func = { | |
ord("l"): decode_list, | |
ord("d"): decode_dict, | |
ord("i"): decode_int, | |
ord("0"): decode_bytes, | |
ord("1"): decode_bytes, | |
ord("2"): decode_bytes, | |
ord("3"): decode_bytes, | |
ord("4"): decode_bytes, | |
ord("5"): decode_bytes, | |
ord("6"): decode_bytes, | |
ord("7"): decode_bytes, | |
ord("8"): decode_bytes, | |
ord("9"): decode_bytes | |
} | |
def bdecode(x): | |
try: | |
r, l = decode_func[x[0]](x, 0) | |
except (IndexError, KeyError, ValueError): | |
raise BTFailure("not a valid bencoded string") | |
if l != len(x): | |
raise BTFailure("invalid bencoded value (data after valid prefix)") | |
return r | |
def encode_int(x): | |
result = bytearray() | |
result.append(ord("i")) | |
result.extend(bytearray(str(x), "utf-8")) | |
result.append(ord("e")) | |
return result | |
def encode_bool(x): | |
if x: | |
return encode_int(1) | |
else: | |
return encode_int(0) | |
def encode_bytes(x): | |
result = bytearray() | |
result.extend(bytearray("{}:".format(len(x)), "utf-8")) | |
result.extend(x) | |
return result | |
def encode_string(x): | |
return encode_bytes(bytearray(x, "utf-8")) | |
def encode_list(x): | |
result = bytearray() | |
result.append(ord("l")) | |
for i in x: | |
result.extend(encode_func[type(i)](i)) | |
result.append(ord("e")) | |
return result | |
def encode_dict(x): | |
result = bytearray() | |
result.append(ord("d")) | |
for k, v in sorted(x.items()): | |
result.extend(encode_string(str(k))) | |
result.extend(encode_func[type(v)](v)) | |
result.append(ord("e")) | |
return result | |
encode_func = { | |
int: encode_int, | |
bool: encode_bool, | |
bytes: encode_bytes, | |
str: encode_string, | |
list: encode_list, | |
tuple: encode_list, | |
dict: encode_dict | |
} | |
def bencode(x): | |
return encode_func[type(x)](x) | |
class Main(object): | |
@staticmethod | |
def read_content(file_path): | |
with open(file_path, 'rb') as f: | |
return bdecode(f.read()) | |
@classmethod | |
def process(cls, base_path, content, force): | |
resume_path = "{}{}.fastresume".format(base_path, content) | |
resume_data = cls.read_content(resume_path) | |
save_path = resume_data['save_path'] | |
if force: | |
torrent_path = "{}{}.torrent".format(base_path, content) | |
torrent_data = cls.read_content(torrent_path) | |
if 'pieces' not in resume_data: | |
piece_length = len(torrent_data['info']['pieces']) / 20 | |
resume_data['pieces'] = [1 for _ in range(int(piece_length))] | |
if 'mapped_files' not in resume_data: | |
if resume_data['qBt-hasRootFolder']: | |
base_path = torrent_data['info']['name'] | |
else: | |
base_path = b'' | |
files = torrent_data['info'].get('files') | |
if not files: | |
files = [{ | |
'path': [torrent_data['info']['name']], | |
}] | |
resume_data['mapped_files'] = [ | |
os.path.join(base_path, os.path.join(*f['path'])) | |
for f in files | |
] | |
elif any(p == 0 for p in resume_data.get('pieces', [0])): | |
print('Do not support incomplete torrent.') | |
exit(1) | |
resume_data['seed_mode'] = 1 | |
resume_data['paused'] = 1 | |
def get_file_state(file_path): | |
size = os.path.getsize(file_path) | |
mtime = int(os.path.getmtime(file_path)) | |
return size, mtime | |
resume_data['file sizes'] = [ | |
get_file_state(os.path.join(save_path, f)) | |
for f in resume_data['mapped_files'] | |
] | |
return cls.write_content(resume_path, bencode(resume_data)) | |
@staticmethod | |
def write_content(file_path, content): | |
os.rename(file_path, file_path + '.bak') | |
with open(file_path, 'wb') as f: | |
f.write(content) | |
@classmethod | |
def main(cls): | |
if len(sys.argv) != 2: | |
print('%s torrent_hash' % sys.argv[0]) | |
exit(1) | |
base_path = os.path.expanduser('~/.local/share/data/qBittorrent/BT_backup/') | |
if os.path.exists(base_path + 'session.lock'): | |
print('Please quit qBittorrent at first.') | |
exit(1) | |
torrent_hash = sys.argv[1] | |
if torrent_hash[0] == '@': | |
torrent_hash = torrent_hash[1:] | |
force = True | |
else: | |
force = False | |
cls.process(base_path, torrent_hash, force) | |
if __name__ == '__main__': | |
Main.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment