Last active
October 15, 2023 08:16
-
-
Save zhaoweizhong/aeab8a487aea1dec683bc186380f5ff4 to your computer and use it in GitHub Desktop.
A simple TMP.LINK (钛盘) downloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import os | |
import time | |
from datetime import datetime | |
from urllib.parse import unquote | |
from tqdm import tqdm | |
from requests.exceptions import HTTPError | |
API_URL = "https://tmp-api.vx-cdn.com/api_v2/" | |
MAX_RETRIES = 5 | |
RETRY_DELAY = 5 # seconds | |
def post_request(endpoint, payload): | |
for attempt in range(MAX_RETRIES): | |
try: | |
response = requests.post(f"{API_URL}{endpoint}", data=payload) | |
response.raise_for_status() # Will raise an HTTPError if the HTTP request returned an unsuccessful status code | |
return response.json() | |
except HTTPError as http_err: | |
print(f"HTTP error occurred: {http_err} - Attempt {attempt + 1} of {MAX_RETRIES}") | |
if attempt + 1 == MAX_RETRIES: | |
print("Maximum number of retries reached. Raising exception.") | |
raise # Re-throw the last exception if the retry limit is reached | |
print(f"Retrying in {RETRY_DELAY} seconds...") | |
time.sleep(RETRY_DELAY) # Wait for a bit before retrying | |
except Exception as err: | |
# For other exceptions, you may want to add handling code or re-raise the exception | |
print(f"An error occurred: {err}") | |
raise | |
def get_directory_details(token, mr_id): | |
payload = { | |
'action': 'details', | |
'token': token, | |
'mr_id': mr_id | |
} | |
return post_request('meetingroom', payload) | |
def get_total_files(token, mr_id): | |
payload = { | |
'action': 'total', | |
'token': token, | |
'mr_id': mr_id | |
} | |
return post_request('meetingroom', payload) | |
def get_file_list(token, mr_id): | |
payload = { | |
'action': 'file_list_page', | |
'token': token, | |
'mr_id': mr_id, | |
'page': 'all', | |
'photo': 0, | |
'sort_by': 1, | |
'sort_type': 1, | |
'search': '' | |
} | |
return post_request('meetingroom', payload) | |
def get_captcha_token(): | |
return post_request('token', {'action': 'challenge'}) | |
def get_file_url(token, ukey, captcha): | |
payload = { | |
'action': 'download_req', | |
'ukey': ukey, | |
'token': token, | |
'captcha': captcha | |
} | |
return post_request('file', payload) | |
def download_file(url, dest_path, start_path): | |
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
relative_path = os.path.relpath(dest_path, start_path) | |
print(f"{now} | {relative_path}") | |
# Check if the file already exists | |
if os.path.exists(dest_path): | |
print("File already exists. Skipping download.") | |
return | |
response = requests.get(url, stream=True) | |
total_size = int(response.headers.get('content-length', 0)) | |
block_size = 1024 | |
progress_bar = tqdm(total=total_size, unit='iB', unit_scale=True) | |
with open(dest_path, "wb") as file: | |
for data in response.iter_content(block_size): | |
progress_bar.update(len(data)) | |
file.write(data) | |
progress_bar.close() | |
def process_directory(token, mr_id, path, start_path): | |
details = get_directory_details(token, mr_id) | |
total = get_total_files(token, mr_id) | |
# If there are files, fetch and download them | |
nums = total.get("data", {}).get("nums", "0") # Fetch nums as a string, default to "0" if not found | |
if int(nums) > 0: # Convert nums to integer before comparing | |
file_list = get_file_list(token, mr_id) | |
for file in file_list.get("data", []): | |
captcha_token = get_captcha_token() | |
file_url_data = get_file_url(token, file['ukey'], captcha_token['data']) | |
file_url = unquote(file_url_data['data']) | |
download_file(file_url, os.path.join(path, file['fname']), start_path) | |
# Recursively process subdirectories | |
sub_rooms = details.get("data", {}).get("sub_rooms") | |
if isinstance(sub_rooms, list): # check if sub_rooms is a list | |
for sub_room in sub_rooms: | |
sub_path = os.path.join(path, sub_room['name']) | |
os.makedirs(sub_path, exist_ok=True) | |
process_directory(token, sub_room['mr_id'], sub_path, start_path) | |
#else: | |
# print(f"Expected a list for 'sub_rooms', got {type(sub_rooms).__name__} with value: {sub_rooms}") | |
def main(): | |
token = input("Enter your access token: ") | |
mr_id = input("Enter the mr_id of the directory to start from: ") | |
base_directory_name = input("Enter the base directory name: ") | |
base_path = os.path.join(os.getcwd(), base_directory_name) | |
start_path = base_path | |
os.makedirs(base_path, exist_ok=True) | |
process_directory(token, mr_id, base_path, start_path) | |
print("Download Finished!") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment