Last active
February 14, 2016 21:09
-
-
Save PolarNick239/f734aec77342fe8dfad3 to your computer and use it in GitHub Desktop.
Computer Science Center video downloader. Example page - https://compscicenter.ru/courses/comp-networks/2012-autumn/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Copyright (c) 2016, Nikolay Polyarnyi | |
# All rights reserved. | |
# | |
# Requirements: | |
# | |
# beautifulsoup4>=4.4.1 | |
# requests>=2.9.1 | |
import json | |
import argparse | |
import urllib.request | |
from pathlib import Path | |
import concurrent.futures | |
from concurrent.futures import ThreadPoolExecutor | |
import requests | |
from bs4 import BeautifulSoup | |
from bs4.element import NavigableString | |
def except_one(xs): | |
assert len(xs) == 1 | |
return xs[0] | |
def except200(page, message): | |
if page.status_code != 200: | |
print(message) | |
return False | |
else: | |
return True | |
def has_any(iterable, predicate): | |
for x in iterable: | |
if predicate(x): | |
return True | |
return False | |
def gather_text(tag): | |
res = "" | |
for child in tag.children: | |
if isinstance(child, NavigableString): | |
res += child.strip() | |
return res | |
def recursive_extract_page_from_iframes(url): | |
print(' going to iframe {}'.format(url)) | |
if url.startswith('//'): | |
url = 'https:' + url | |
page = requests.get(url) | |
if not except200(page, ' Page status code is {}! {}'.format(page.status_code, url)): | |
return | |
page = BeautifulSoup(page.content, 'html.parser') | |
iframes = page.find_all('iframe') | |
if len(iframes) == 0: | |
return page | |
iframe = except_one(iframes) | |
return recursive_extract_page_from_iframes(iframe['src']) | |
def download_video(name, video_url, extension, output_dir): | |
video_path = Path(output_dir) / (name + '.{}.tmp'.format(extension)) | |
print(' downloading {} from {} ... to {}'.format(extension, video_url, video_path)) | |
try: | |
video_path.unlink() | |
except FileNotFoundError: | |
pass | |
urllib.request.urlretrieve(video_url, str(video_path)) | |
video_path.rename(Path(output_dir) / (name + '.{}'.format(extension))) | |
print(' downloaded {} from {} to {}'.format(extension, video_url, video_path)) | |
def download_lesson(name, video_page_url, output_dir): | |
page = requests.get(video_page_url) | |
if not except200(page, ' Page status code is {}! {}'.format(page.status_code, video_page_url)): | |
return | |
page = BeautifulSoup(page.content, 'html.parser') | |
video_page_url = except_one(page.find_all(lambda tag: tag.name == 'iframe' and 'slide' not in tag.prettify()))['src'] | |
page = recursive_extract_page_from_iframes(video_page_url) | |
data_params = except_one(page.findAll("div", {"class" : "embed"})).attrs['data-params'] | |
data_params = json.loads(data_params)['html5'] | |
downloaded_extensions = set() | |
for extension in ['mp4']: # 'webm' | |
if extension in downloaded_extensions: | |
print(' extension {} skipped - file already downloaded with such URL!'.format(extension)) | |
continue | |
download_video(name, data_params[extension]['videoUrl'], extension, output_dir) | |
downloaded_extensions.add(extension) | |
def download_course(url, output_dir): | |
print('Downloading course from URL: {}'.format(url)) | |
page = requests.get(url) | |
if not except200(page, 'Page status code is {}! {}'.format(page.status_code, url)): | |
return | |
page = BeautifulSoup(page.content, 'html.parser') | |
# Extracting element, that contains <small> tag as children (for text about semester of course) | |
course_name = except_one(page.find_all(lambda tag: has_any(list(tag.children), lambda child: child.name == 'small'))) | |
course_name = gather_text(course_name) | |
print(' Course name: {}'.format(course_name)) | |
# Extracting table with lessons: | |
lessons_table = except_one(page.find_all('table')) | |
columns_names = list(except_one(lessons_table.find_all('thead')).children) | |
lessons_table = lessons_table.find_all('tr') | |
name_index, video_index = 5, 11 | |
assert columns_names[name_index].get_text(strip=True).lower() == 'название' | |
assert columns_names[video_index].get_text(strip=True).lower() == 'материалы' | |
print(' Lessons: {}'.format(len(lessons_table))) | |
output_dir = Path(output_dir) / course_name | |
try: | |
output_dir.mkdir(parents=True) | |
except FileExistsError: | |
pass | |
with ThreadPoolExecutor(2) as pool: | |
futures = [] | |
for i, lesson in enumerate(lessons_table): | |
name = '{0:02d}. '.format(i + 1) + list(lesson.children)[name_index].get_text(strip=True) | |
video = list(lesson.children)[video_index] | |
video = video.find_all(lambda tag: tag.get_text(strip=True).lower() == 'видео') | |
assert len(video) <= 1 | |
if len(video) == 0: | |
print(' {}/{} no video found for lesson "{}"'.format(i + 1, len(lessons_table), name)) | |
continue | |
else: | |
video = video[0] | |
video_page_url = video['href'] | |
if not video_page_url.startswith('http'): | |
video_page_url = 'https://compscicenter.ru' + video_page_url | |
print(' {}/{} downloading lesson "{}" from {}'.format(i + 1, len(lessons_table), name, video_page_url)) | |
f = pool.submit(download_lesson, name, video_page_url, output_dir) | |
futures.append(f) | |
concurrent.futures.wait(futures) | |
def download_courses(urls, output_dir): | |
print('Courses will be download from URLs: {}...'.format(args.urls)) | |
for i, url in enumerate(urls): | |
try: | |
download_course(url, output_dir) | |
print('{}/{} courses finished!'.format(i + 1, len(urls))) | |
except Exception as e: | |
print('Course downloading failed! {}'.format(url)) | |
raise e | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='CSC videos downloader') | |
parser.add_argument('urls', help='Multiple URLs of CSC courses', nargs='+') | |
parser.add_argument('--dir', help='Directory for downloaded videos', required=True) | |
args = parser.parse_args() | |
download_courses(args.urls, args.dir) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment