Last active
June 25, 2024 13:25
-
-
Save 0x24a/0288dba711ffbd1f31e25d60ad300d2e to your computer and use it in GitHub Desktop.
Segmented Catbox Uploading Script.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import rich | |
import rich.console | |
import rich.progress | |
twoHundredMBs=1000*1000*200 | |
console=rich.console.Console() | |
def catbox_upload(file_content,filename): | |
return requests.post("https://catbox.moe/user/api.php",files={"reqtype":(None, "fileupload"),"userhash":(None, ""),"fileToUpload":(filename,file_content)}).text | |
def catbox_upload_file(filename): | |
return upload(open(filename,"rb").read(),filename) | |
def upload_with_retry(content,filename,times=10): | |
for i in range(1,times+1): | |
try: | |
return catbox_upload(content,filename) | |
except: | |
console.print(f"[red]![/red] Retrying {i}/{times}") | |
return None | |
def download_with_retry(url,times=10): | |
for i in range(1,times+1): | |
try: | |
return requests.get(url).content | |
except: | |
console.print(f"[red]![/red] Retrying {i}/{times}") | |
return None | |
def download_first_1024_bytes(url,retry=10): | |
for i in range(1,retry+1): | |
try: | |
stream=requests.get(url,stream=True) | |
for content in stream.iter_content(1024): | |
stream.close() | |
return content | |
except: | |
console.print(f"[red]![/red] Retrying {i}/{retry}") | |
return None | |
def upload(filename): | |
with open(filename,"rb") as f: | |
console.print("[green]*[/green] File loaded.") | |
buffer=f.read(twoHundredMBs) | |
if len(bytes(buffer)) < twoHundredMBs: | |
console.print("[green]*[/green] File is smaller than 200MB, uploading directly...") | |
result=upload_with_retry(buffer, filename) | |
if result: | |
with rich.progress.Progress(console=console) as progress: | |
task=progress.add_task("[green]*[/green] Verifing file...",start=False) | |
verify=download_first_1024_bytes(result) | |
progress.update(task,total=1,completed=1) | |
if verify != buffer[:1024]: | |
del buffer | |
console.print("[red]![/red] Invaild Upload!") | |
return | |
del buffer | |
console.print("[green]*[/green] Upload Done!") | |
console.print(f"[green]*[/green] URL: [yellow][bold]{result}[bold][/yellow]") | |
return | |
else: | |
console.print("[red]![/red] Upload Failed.") | |
else: | |
console.print("[green]*[/green] Bigger than 200MB, building filebound...") | |
filebounds=[] | |
console.print("[green]*[/green] Uploading First Segment...") | |
result=upload_with_retry(buffer,"foo.part") | |
if not result: | |
console.print("[red]![/red] Failed to upload segment 1.") | |
return | |
else: | |
filebounds.append(result.split("/")[-1].split(".")[0]) | |
console.print(f"[green]*[/green] Part URL: [yellow]{result}[/yellow]") | |
with rich.progress.Progress(console=console) as progress: | |
task=progress.add_task("[green]*[/green] Verifing file...",start=False) | |
verify=download_first_1024_bytes(result) | |
progress.update(task,total=1,completed=1) | |
if verify != buffer[:1024]: | |
del buffer | |
console.print("[red]![/red] Invaild Upload!") | |
return | |
del buffer | |
console.print("[green]*[/green] Part 1 Done!") | |
counter=2 | |
while 1: | |
console.print("[green]*[/green] Uploading Segment",counter) | |
buffer=f.read(twoHundredMBs) | |
result=upload_with_retry(buffer,"foo"+str(counter)+".part") | |
if not result: | |
console.print("[red]![/red] Failed to upload segment 1.") | |
return | |
else: | |
filebounds.append(result.split("/")[-1].split(".")[0]) | |
console.print(f"[green]*[/green] Part URL: [yellow]{result}[/yellow]") | |
with rich.progress.Progress(console=console) as progress: | |
task=progress.add_task("[green]*[/green] Verifing file...",start=False) | |
verify=download_first_1024_bytes(result) | |
progress.update(task,total=1,completed=1) | |
if verify != buffer[:1024]: | |
del buffer | |
console.print("[red]![/red] Invaild Upload!") | |
return | |
console.print(f"[green]*[/green] Part {str(counter)} Done!") | |
if len(bytes(buffer)) < twoHundredMBs: | |
console.print("[green]*[/green] Uploading Done! Uploading Bound file...") | |
del buffer | |
break | |
del buffer | |
counter+=1 | |
console.print("[green]*[/green] Creating bound file...") | |
boundfile="\n".join([filename.split("/")[-1],""]+filebounds).encode("utf-8") | |
console.print("[green]*[/green] Uploading...") | |
result=upload_with_retry(boundfile,"bar.boundindex") | |
if not result: | |
console.print("[red]![/red] Failed to upload boundfile.") | |
return | |
else: | |
filebounds.append(result.split("/")[-1].split(".")[0]) | |
console.print("[green]*[/green] Upload Done!") | |
console.print(f"[green]*[/green] BoundURL: [yellow][bold]{result}[bold][/yellow]") | |
if __name__ == "__main__": | |
console.print("\n[cyan][bold]Catbox Segmented Uploader[/bold][/cyan]\n") | |
console.print("[green][bold]V1.2[/bold][/green], By [yellow][bold]0x24a[/bold][/yellow]\n") | |
while 1: | |
choose=console.input("([bold][yellow]U[/bold][/yellow])pload/([bold][yellow]D[/bold][/yellow])ownload/([bold][yellow]E[/bold][/yellow])xit: ").upper() | |
if choose=="U": | |
file=console.input("File path: ") | |
if not os.path.exists(file): | |
console.print("Error: File doesn't exist.\n",style="red") | |
continue | |
console.print("[green]*[/green] Uploading...") | |
upload(file) | |
console.print() | |
elif choose == "D": | |
url=console.input("File URL: ") | |
if not url.startswith("https://files.catbox.moe/"): | |
console.print("Error: Ooops! the script can only download files from catbox currently.\n",style="red") | |
continue | |
if len(url.split("/")[-1].split(".")[0]) != 6: | |
console.print("Error: Ooops! that's not a valid catbox URL.\n",style="red") | |
continue | |
if url.endswith(".boundindex"): | |
download_as_bound=console.input("[bold][yellow]?[/yellow][bold] Download this file as a [bold][yellow]boundfile[/yellow][/bold]? ([gray]n[/gray] as a plain file):") | |
if download_as_bound.lower() != "n": | |
console.print("[green]*[/green] Resolving boundfile...") | |
try: | |
bound=download_with_retry(url).decode("utf-8").splitlines() | |
filename=bound[0] | |
#assert "/" not in filename, "Secure issues" | |
bounds=bound[2:] | |
links=[] | |
for i in bounds: | |
assert len(i) == 6 | |
links.append(f"https://files.catbox.moe/{i}.part") | |
console.print("[green]*[/green] Resolved!") | |
except: | |
console.print("[red]![/red] Invaild boundfile.\n") | |
continue | |
if console.input(f"[bold][yellow]?[/yellow][bold] Estimated Size: [bold][yellow]{(len(bounds)-1)*200}MB[/yellow][/bold]-[bold][yellow]{len(bounds)*200}MB[/yellow][/bold], and saved as [yellow][bold]\"{filename}\"[/yellow][/bold]. sure to download?(Y/n) :").lower() == "n": | |
console.print(f"[green]*[/green] Cancelled.\n") | |
continue | |
console.print("[green]*[/green] Opening file") | |
with open(filename,"wb+") as f: | |
counter=1 | |
for i in links: | |
console.print(f"[green]*[/green] Downloading Segment {counter} ({counter}/{len(links)})") | |
buffer=download_with_retry(i) | |
if buffer: | |
f.write(buffer) | |
else: | |
console.print("[red]![/red] Download failed.\n") | |
os.remove(filename) | |
break | |
del buffer | |
counter+=1 | |
console.print("[green]*[/green] [bold][yellow]Done![/yellow][/bold]\n") | |
continue | |
save_as=console.input("Save as: ") | |
console.print("[green]*[/green] Downloading...") | |
with open(save_as,"wb+") as f: | |
f.write(download_with_retry(url)) | |
console.print("[green]*[/green] [bold][yellow]Done![/yellow][/bold]\n") | |
continue | |
elif choose=="E": | |
console.print("\n[green]*[/green] [bold][yellow]Have a good day![/yellow][/bold]\n") | |
exit(0) |
Author
0x24a
commented
Mar 23, 2024
Preview(V1.2)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment