Skip to content

Instantly share code, notes, and snippets.

@Atreyagaurav
Last active June 29, 2023 16:18
Show Gist options
  • Save Atreyagaurav/a3528d06803271fb928ab3f915bfddec to your computer and use it in GitHub Desktop.
Save Atreyagaurav/a3528d06803271fb928ab3f915bfddec to your computer and use it in GitHub Desktop.
Downloads as well as checks anime episodes from gogoanime.
#!/usr/bin/env python
import json
import os
import sys
from string import Template
from urllib.parse import urljoin
import subprocess
import time
import pycurl
import requests
from bs4 import BeautifulSoup
__doc__ = f"""Downloader for anime available in gogoanime website.
Usage: {sys.argv[0]} COMMAND [ANIME] [RANGE]
COMMAND - Command to execute, see details below for available commands.
ANIME - Name of the anime or url.
It must be same as the name on gogoanime address bar.
RANGE - Range of the episodes; defaults to all episodes.
AVAILABLE COMMANDS:
help - Display this message.
url - Download the video from provided URL.
download - Download specified anime(episodes).
check - Check if specified anime(episodes) has missing episodes files.
Do not use this when simple commands like tree or ls can give
you information as this command checks the files online.
list - Just list the available episodes range.
play - Play the episodes in mpv
Example Usage:
{sys.argv[0]} download "One Piece" 1
{sys.argv[0]} check "One Piece" 1-10
{sys.argv[0]} download one-piece 1-2,5
{sys.argv[0]} list https://gogoanime.so/category/one-piece
{sys.argv[0]} url https://gogoanime.so/one-piece-episode-1
NOTE: url command will save in current directory instead of anime directory.
This option is for when there are exceptions in naming conventions.
"""
gogoanime_url = 'https://gogoanime.so'
mpv_command = [
'mpv', '--geometry=300-0-20', '--on-all-workspaces', '--no-config'
]
logfile = ".anime_history"
ajax_t = Template('https://gogo-stream.com/ajax.php?${q}')
episode_t = Template("${anime}-episode-${ep}")
anime_t = Template("category/${anime}")
resume_t = Template("Range: bytes=${size}-")
req_headers = {
"User-Agent":
"Mozilla/5.0 (X11; Linux x86_64; rv:82.0) Gecko/20100101 Firefox/82.0",
"Accept":
"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Upgrade-Insecure-Requests": "1"
}
down_headers = {
"User-Agent":
"Mozilla/5.0 (X11; Linux x86_64; rv:82.0) Gecko/20100101 Firefox/82.0",
"Accept":
"video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5",
"Accept-Language": "en-US,en;q=0.5",
}
def get_anime_url(anime_name):
return urljoin(
gogoanime_url,
anime_t.substitute(anime=anime_name.lower().replace(' ', '-')))
def get_episode_url(anime_name, episode):
return urljoin(
gogoanime_url,
episode_t.substitute(anime=anime_name.lower().replace(' ', '-'),
ep=episode))
def process_anime_name(name):
return name.lower().replace(' ', '-')
def download_file(url, filepath, replace=False):
if os.path.exists(filepath) and replace == False:
print('File already downloaded, skipping.')
return
part_file = f'{filepath}.part'
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.NOPROGRESS, 0)
curl_header = [f'{k}: {v}' for k, v in down_headers.items()]
if os.path.exists(part_file) and replace == False:
print('Previously Downloaded part found.')
wmode = 'ab'
curl_header.append(
resume_t.substitute(size=os.path.getsize(part_file)))
else:
wmode = 'wb'
c.setopt(pycurl.HTTPHEADER, curl_header)
try:
with open(part_file, wmode) as writer:
c.setopt(pycurl.WRITEDATA, writer)
c.perform()
c.close()
except (KeyboardInterrupt, pycurl.error) as e:
c.close()
raise SystemExit(f"Download Failed {e}")
os.rename(part_file, filepath)
def get_soup(url):
r = requests.get(url, headers=req_headers)
if r.status_code == 404:
return None
return BeautifulSoup(r.text, 'html.parser')
def get_direct_video_url(gogo_url):
soup = get_soup(gogo_url)
php_l = soup.find('iframe')['src']
ajx_l = ajax_t.substitute(q=php_l.split('?')[1])
r = requests.get(ajx_l)
link = json.loads(r.text)['source_bk'][0]['file']
ftype = link.split('.')[-1]
return link, ftype
def read_log(anime_name=None):
if not os.path.exists(logfile):
log = dict()
else:
with open(logfile, 'r') as r:
# what happens when file is used as iterator, need to close?
log = {l[0]: l[1] for l in (li.strip().split() for li in r)}
if anime_name == None:
return log
return log.get(anime_name)
def write_log(anime_name, episodes):
log = read_log()
if anime_name in log:
log[anime_name] = compress_range(
extract_range(f'{log[anime_name]},{episodes}'))
else:
log[anime_name] = episodes
with open(logfile, 'w') as w:
w.writelines((f'{k} {v}\n' for k, v in log.items()))
def play_anime(name, episodes):
for e in episodes:
# TODO:
# add the option to open local file if found
# remove the need to user intervention for m3u8
url = get_episode_url(name, e)
print('Getting Streaming Link:', url)
durl, ext = get_direct_video_url(url)
if durl == None:
raise SystemExit('Url for the file not found')
if ext == 'm3u8':
print("m3u8 file found")
print(f'URL:{durl}')
print(requests.get(durl).text)
durl = input('Enter the stream url:')
t1 = time.time()
subprocess.call(" ".join(mpv_command + [durl]), shell=True)
if (time.time()-t1)>(5*60): # 5 minutes watchtime at least, otherwise consider it unwatched
write_log(name, e)
def download_anime(name, episodes):
for e in episodes:
url = get_episode_url(name, e)
print('Downloading:', url)
durl, ext = get_direct_video_url(url)
if durl == None:
raise SystemExit('Url for the file not found')
if ext == 'm3u8':
print("m3u8 file found")
print(f'URL:{durl}')
print(requests.get(durl).text)
raise SystemExit('Format not supported')
download_file(durl, f'./{name}/ep{e:02d}.{ext}')
def check_anime(name, episodes):
unavail_eps = []
for e in episodes:
url = get_episode_url(name, e)
print('Testing:', url)
durl, ext = get_direct_video_url(url)
if durl == None:
raise SystemExit('Url for the file not found')
if not os.path.exists(f'./{name}/ep{e:02d}.{ext}'):
unavail_eps.append(e)
if len(unavail_eps) == 0:
print('All episodes in given range are locally available')
else:
print(f'Missing episodes: {compress_range(unavail_eps)}')
def get_episodes_range(anime_url):
soup = get_soup(anime_url)
if soup == None:
return []
rngs_obj = soup.find_all('a', ep_end=True, ep_start=True)
total_rng = []
for r in rngs_obj:
rng = r.text
rngs = rng.split('-')
if rngs[0] == '0':
rngs[0] = '1'
total_rng.append('-'.join(rngs))
return ','.join(total_rng)
def compress_range(range_list):
range_list = sorted(range_list)
if len(range_list) == 0:
return ''
rng_str = f'{range_list[0]}'
prev = range_list[0]
rng = False
for r in range_list[1:]:
if r == prev:
continue
if r == (prev + 1):
if not rng:
rng_str += '-'
rng = True
else:
if rng:
rng_str += f'{prev},{r}'
else:
rng_str += f',{r}'
rng = False
prev = r
if rng:
rng_str += f'{prev}'
return rng_str
def extract_range(range_str):
ranges = range_str.split(',')
try:
for r in ranges:
if '-' in r:
rng = r.split('-')
if len(rng) > 2:
print(f'Incorrect formatting: {r}')
raise SystemExit
yield from range(int(rng[0]), int(rng[1]) + 1)
else:
yield int(r)
except ValueError:
print(f'Incorrect formatting: use integers for episodes')
raise SystemExit
def read_args(args):
if '/' in args[0]:
name = args[0].split('/')[-1]
else:
name = process_anime_name(args[0])
if get_soup(get_anime_url(name)) == None:
print(f'Anime with name doesn\'t exist: {" ".join(name.split("-"))}')
raise SystemExit
if not os.path.exists(f'./{name}'):
os.makedirs(f'./{name}', exist_ok=True)
if len(args) == 1:
print('Episodes range not given defaulting to all')
available_rng = get_episodes_range(get_anime_url(name))
print(f'Available episodes: {available_rng}')
episodes = extract_range(available_rng)
elif len(args) == 2:
episodes = extract_range(args[1])
else:
print('Too many arguments.\n')
print(__doc__)
raise SystemExit
return name, episodes
def list_episodes(name, episodes):
if len(sys.argv) == 4:
eps = set(episodes)
avl_eps = set(extract_range(get_episodes_range(get_anime_url(name))))
res = eps.intersection(avl_eps)
result = compress_range(res)
print(f'Available episodes: {result}')
print(f'Watched episodes: {read_log().get(name)}')
def download_from_url(gogo_url):
durl, ext = get_direct_video_url(gogo_url)
if durl == None:
raise SystemExit('Url for the file not found')
if ext == 'm3u8':
# turns out youtube-dl can download underlying videos if you give it the m3u8 file.
print('Currently not supported')
raise SystemExit
download_file(durl, f'./{gogo_url.split("/")[-1]}.{ext}')
# The functions need to take 2 arguments anime_name and episodes_range
commands_func = {
"download": download_anime,
"check": check_anime,
"list": list_episodes,
"play": play_anime
}
if __name__ == '__main__':
if len(sys.argv) == 1 or sys.argv[1] == 'help' or sys.argv[1] == '-h':
print(__doc__)
raise SystemExit
command = sys.argv[1]
if command == 'url':
if len(sys.argv) == 3:
download_from_url(sys.argv[2])
else:
print('Incorrect argument\n')
print(__doc__)
raise SystemExit
if command not in commands_func:
print('Invalid Command')
print(__doc__)
raise SystemExit
name, episodes = read_args(sys.argv[2:])
commands_func[command](name, episodes)
@Kwsuga
Copy link

Kwsuga commented Jun 27, 2023

sx

@Atreyagaurav
Copy link
Author

This was turned into a full project with lot more functionality. Please visit: https://github.com/Atreyagaurav/anime-helper-shell

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment