Last active
June 29, 2023 16:18
-
-
Save Atreyagaurav/a3528d06803271fb928ab3f915bfddec to your computer and use it in GitHub Desktop.
Downloads as well as checks anime episodes from gogoanime.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import json | |
import os | |
import sys | |
from string import Template | |
from urllib.parse import urljoin | |
import subprocess | |
import time | |
import pycurl | |
import requests | |
from bs4 import BeautifulSoup | |
__doc__ = f"""Downloader for anime available in gogoanime website. | |
Usage: {sys.argv[0]} COMMAND [ANIME] [RANGE] | |
COMMAND - Command to execute, see details below for available commands. | |
ANIME - Name of the anime or url. | |
It must be same as the name on gogoanime address bar. | |
RANGE - Range of the episodes; defaults to all episodes. | |
AVAILABLE COMMANDS: | |
help - Display this message. | |
url - Download the video from provided URL. | |
download - Download specified anime(episodes). | |
check - Check if specified anime(episodes) has missing episodes files. | |
Do not use this when simple commands like tree or ls can give | |
you information as this command checks the files online. | |
list - Just list the available episodes range. | |
play - Play the episodes in mpv | |
Example Usage: | |
{sys.argv[0]} download "One Piece" 1 | |
{sys.argv[0]} check "One Piece" 1-10 | |
{sys.argv[0]} download one-piece 1-2,5 | |
{sys.argv[0]} list https://gogoanime.so/category/one-piece | |
{sys.argv[0]} url https://gogoanime.so/one-piece-episode-1 | |
NOTE: url command will save in current directory instead of anime directory. | |
This option is for when there are exceptions in naming conventions. | |
""" | |
gogoanime_url = 'https://gogoanime.so' | |
mpv_command = [ | |
'mpv', '--geometry=300-0-20', '--on-all-workspaces', '--no-config' | |
] | |
logfile = ".anime_history" | |
ajax_t = Template('https://gogo-stream.com/ajax.php?${q}') | |
episode_t = Template("${anime}-episode-${ep}") | |
anime_t = Template("category/${anime}") | |
resume_t = Template("Range: bytes=${size}-") | |
req_headers = { | |
"User-Agent": | |
"Mozilla/5.0 (X11; Linux x86_64; rv:82.0) Gecko/20100101 Firefox/82.0", | |
"Accept": | |
"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Upgrade-Insecure-Requests": "1" | |
} | |
down_headers = { | |
"User-Agent": | |
"Mozilla/5.0 (X11; Linux x86_64; rv:82.0) Gecko/20100101 Firefox/82.0", | |
"Accept": | |
"video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5", | |
"Accept-Language": "en-US,en;q=0.5", | |
} | |
def get_anime_url(anime_name): | |
return urljoin( | |
gogoanime_url, | |
anime_t.substitute(anime=anime_name.lower().replace(' ', '-'))) | |
def get_episode_url(anime_name, episode): | |
return urljoin( | |
gogoanime_url, | |
episode_t.substitute(anime=anime_name.lower().replace(' ', '-'), | |
ep=episode)) | |
def process_anime_name(name): | |
return name.lower().replace(' ', '-') | |
def download_file(url, filepath, replace=False): | |
if os.path.exists(filepath) and replace == False: | |
print('File already downloaded, skipping.') | |
return | |
part_file = f'{filepath}.part' | |
c = pycurl.Curl() | |
c.setopt(pycurl.URL, url) | |
c.setopt(pycurl.NOPROGRESS, 0) | |
curl_header = [f'{k}: {v}' for k, v in down_headers.items()] | |
if os.path.exists(part_file) and replace == False: | |
print('Previously Downloaded part found.') | |
wmode = 'ab' | |
curl_header.append( | |
resume_t.substitute(size=os.path.getsize(part_file))) | |
else: | |
wmode = 'wb' | |
c.setopt(pycurl.HTTPHEADER, curl_header) | |
try: | |
with open(part_file, wmode) as writer: | |
c.setopt(pycurl.WRITEDATA, writer) | |
c.perform() | |
c.close() | |
except (KeyboardInterrupt, pycurl.error) as e: | |
c.close() | |
raise SystemExit(f"Download Failed {e}") | |
os.rename(part_file, filepath) | |
def get_soup(url): | |
r = requests.get(url, headers=req_headers) | |
if r.status_code == 404: | |
return None | |
return BeautifulSoup(r.text, 'html.parser') | |
def get_direct_video_url(gogo_url): | |
soup = get_soup(gogo_url) | |
php_l = soup.find('iframe')['src'] | |
ajx_l = ajax_t.substitute(q=php_l.split('?')[1]) | |
r = requests.get(ajx_l) | |
link = json.loads(r.text)['source_bk'][0]['file'] | |
ftype = link.split('.')[-1] | |
return link, ftype | |
def read_log(anime_name=None): | |
if not os.path.exists(logfile): | |
log = dict() | |
else: | |
with open(logfile, 'r') as r: | |
# what happens when file is used as iterator, need to close? | |
log = {l[0]: l[1] for l in (li.strip().split() for li in r)} | |
if anime_name == None: | |
return log | |
return log.get(anime_name) | |
def write_log(anime_name, episodes): | |
log = read_log() | |
if anime_name in log: | |
log[anime_name] = compress_range( | |
extract_range(f'{log[anime_name]},{episodes}')) | |
else: | |
log[anime_name] = episodes | |
with open(logfile, 'w') as w: | |
w.writelines((f'{k} {v}\n' for k, v in log.items())) | |
def play_anime(name, episodes): | |
for e in episodes: | |
# TODO: | |
# add the option to open local file if found | |
# remove the need to user intervention for m3u8 | |
url = get_episode_url(name, e) | |
print('Getting Streaming Link:', url) | |
durl, ext = get_direct_video_url(url) | |
if durl == None: | |
raise SystemExit('Url for the file not found') | |
if ext == 'm3u8': | |
print("m3u8 file found") | |
print(f'URL:{durl}') | |
print(requests.get(durl).text) | |
durl = input('Enter the stream url:') | |
t1 = time.time() | |
subprocess.call(" ".join(mpv_command + [durl]), shell=True) | |
if (time.time()-t1)>(5*60): # 5 minutes watchtime at least, otherwise consider it unwatched | |
write_log(name, e) | |
def download_anime(name, episodes): | |
for e in episodes: | |
url = get_episode_url(name, e) | |
print('Downloading:', url) | |
durl, ext = get_direct_video_url(url) | |
if durl == None: | |
raise SystemExit('Url for the file not found') | |
if ext == 'm3u8': | |
print("m3u8 file found") | |
print(f'URL:{durl}') | |
print(requests.get(durl).text) | |
raise SystemExit('Format not supported') | |
download_file(durl, f'./{name}/ep{e:02d}.{ext}') | |
def check_anime(name, episodes): | |
unavail_eps = [] | |
for e in episodes: | |
url = get_episode_url(name, e) | |
print('Testing:', url) | |
durl, ext = get_direct_video_url(url) | |
if durl == None: | |
raise SystemExit('Url for the file not found') | |
if not os.path.exists(f'./{name}/ep{e:02d}.{ext}'): | |
unavail_eps.append(e) | |
if len(unavail_eps) == 0: | |
print('All episodes in given range are locally available') | |
else: | |
print(f'Missing episodes: {compress_range(unavail_eps)}') | |
def get_episodes_range(anime_url): | |
soup = get_soup(anime_url) | |
if soup == None: | |
return [] | |
rngs_obj = soup.find_all('a', ep_end=True, ep_start=True) | |
total_rng = [] | |
for r in rngs_obj: | |
rng = r.text | |
rngs = rng.split('-') | |
if rngs[0] == '0': | |
rngs[0] = '1' | |
total_rng.append('-'.join(rngs)) | |
return ','.join(total_rng) | |
def compress_range(range_list): | |
range_list = sorted(range_list) | |
if len(range_list) == 0: | |
return '' | |
rng_str = f'{range_list[0]}' | |
prev = range_list[0] | |
rng = False | |
for r in range_list[1:]: | |
if r == prev: | |
continue | |
if r == (prev + 1): | |
if not rng: | |
rng_str += '-' | |
rng = True | |
else: | |
if rng: | |
rng_str += f'{prev},{r}' | |
else: | |
rng_str += f',{r}' | |
rng = False | |
prev = r | |
if rng: | |
rng_str += f'{prev}' | |
return rng_str | |
def extract_range(range_str): | |
ranges = range_str.split(',') | |
try: | |
for r in ranges: | |
if '-' in r: | |
rng = r.split('-') | |
if len(rng) > 2: | |
print(f'Incorrect formatting: {r}') | |
raise SystemExit | |
yield from range(int(rng[0]), int(rng[1]) + 1) | |
else: | |
yield int(r) | |
except ValueError: | |
print(f'Incorrect formatting: use integers for episodes') | |
raise SystemExit | |
def read_args(args): | |
if '/' in args[0]: | |
name = args[0].split('/')[-1] | |
else: | |
name = process_anime_name(args[0]) | |
if get_soup(get_anime_url(name)) == None: | |
print(f'Anime with name doesn\'t exist: {" ".join(name.split("-"))}') | |
raise SystemExit | |
if not os.path.exists(f'./{name}'): | |
os.makedirs(f'./{name}', exist_ok=True) | |
if len(args) == 1: | |
print('Episodes range not given defaulting to all') | |
available_rng = get_episodes_range(get_anime_url(name)) | |
print(f'Available episodes: {available_rng}') | |
episodes = extract_range(available_rng) | |
elif len(args) == 2: | |
episodes = extract_range(args[1]) | |
else: | |
print('Too many arguments.\n') | |
print(__doc__) | |
raise SystemExit | |
return name, episodes | |
def list_episodes(name, episodes): | |
if len(sys.argv) == 4: | |
eps = set(episodes) | |
avl_eps = set(extract_range(get_episodes_range(get_anime_url(name)))) | |
res = eps.intersection(avl_eps) | |
result = compress_range(res) | |
print(f'Available episodes: {result}') | |
print(f'Watched episodes: {read_log().get(name)}') | |
def download_from_url(gogo_url): | |
durl, ext = get_direct_video_url(gogo_url) | |
if durl == None: | |
raise SystemExit('Url for the file not found') | |
if ext == 'm3u8': | |
# turns out youtube-dl can download underlying videos if you give it the m3u8 file. | |
print('Currently not supported') | |
raise SystemExit | |
download_file(durl, f'./{gogo_url.split("/")[-1]}.{ext}') | |
# The functions need to take 2 arguments anime_name and episodes_range | |
commands_func = { | |
"download": download_anime, | |
"check": check_anime, | |
"list": list_episodes, | |
"play": play_anime | |
} | |
if __name__ == '__main__': | |
if len(sys.argv) == 1 or sys.argv[1] == 'help' or sys.argv[1] == '-h': | |
print(__doc__) | |
raise SystemExit | |
command = sys.argv[1] | |
if command == 'url': | |
if len(sys.argv) == 3: | |
download_from_url(sys.argv[2]) | |
else: | |
print('Incorrect argument\n') | |
print(__doc__) | |
raise SystemExit | |
if command not in commands_func: | |
print('Invalid Command') | |
print(__doc__) | |
raise SystemExit | |
name, episodes = read_args(sys.argv[2:]) | |
commands_func[command](name, episodes) |
This was turned into a full project with lot more functionality. Please visit: https://github.com/Atreyagaurav/anime-helper-shell
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
sx