-
-
Save mattgathu/34e574f10810b42c33287f1714ee168d to your computer and use it in GitHub Desktop.
Script to download all academic content about lectures at www.higuita.com.br of Prof. Alexandre F. Beletti at UFPA
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
# | |
# Python Script | |
# | |
# Copyleft © Manoel Vilela | |
# | |
# | |
# stdlib | |
from os import path | |
from queue import Queue | |
from mimetypes import MimeTypes | |
from argparse import ArgumentParser | |
from urllib.request import urlretrieve | |
from concurrent.futures import ThreadPoolExecutor | |
import os | |
import asyncio | |
import time | |
# third-lib | |
from bs4 import BeautifulSoup, SoupStrainer | |
import requests | |
import progressbar | |
""" | |
A script to download all the public academic content of lectures from | |
www.higuita.com.br, whose belongs to Alexandre F. Beletti, most knew | |
for Prof. Higuita. | |
You can specify the folder for download in the list FOLDERS at that file. | |
""" | |
# to execute this file you need: | |
# sudo apt-get install python3 python3-pip | |
# sudo pip3 install bs4 progressbar2 requests | |
# python3 higuita_dump.py [-t threads] [-v ] folder1 folder2... | |
# constants | |
EVENT_LOOP = asyncio.get_event_loop() | |
MIME = MimeTypes() | |
URL_BASE = "http://www.higuita.com.br/" | |
# command line interface | |
parser = ArgumentParser( | |
prog="python3 higuita_dump.py", | |
description="A dumper script for higuita files at www.higuita.com.br") | |
parser.add_argument( | |
"-v", "--verbose", | |
default=False, | |
action="store_true", | |
dest="verbose", | |
help="Active the verbose mode and print each scheduling task", | |
) | |
parser.add_argument( | |
"-t", '--threads', | |
default=24, | |
type=int, | |
dest="threads", | |
help="Specify the number of threads to execute the dump" | |
) | |
parser.add_argument( | |
"folders", | |
default=['papers', 'arq', 'redes1', | |
'redes2', 'so'], | |
nargs="*", | |
) | |
opts = parser.parse_args() | |
EVENT_LOOP.set_default_executor(ThreadPoolExecutor(opts.threads)) | |
# DATA MINING | |
# parsing the links whose is possible file | |
# in the page | |
def get_links(url): | |
"""An url of specific higuita folder: www.higuita.com.br/papers""" | |
filenames = [] | |
source = requests.get(url).text | |
soup = BeautifulSoup(source, "lxml", parse_only=SoupStrainer('a')) | |
for link in iter(soup): | |
if hasattr(link, 'href') and any(MIME.guess_type(link.attrs['href'])): | |
filenames.append(link.attrs['href']) | |
return [(path.join(url, fname), fname) for fname in filenames] | |
# schedule a task of download to running in | |
# main event loop | |
@asyncio.coroutine | |
def download_task(url, file, bar): | |
"""url to download | filename to save | bar to report progress""" | |
if opts.verbose: | |
print("Scheduling download... {}".format(file)) | |
yield from EVENT_LOOP.run_in_executor(None, download_link, url, file, bar) | |
# download a specific link and send | |
# the report_hook updates to the ProgressBar | |
def download_link(url, fname, pbar): | |
"""url to download | filename to save | bar to report progress""" | |
def report_hook(count, block_size, total_size): | |
if not pbar.start_time: | |
pbar.queue.put(total_size) | |
while not pbar.start_time: | |
time.sleep(1) | |
pbar.queue.put(block_size) | |
urlretrieve(url, fname, reporthook=report_hook) | |
# create a progressbar for report the actual progress of | |
# a folder download | |
def create_progressbar(folder): | |
"""give a folder name of download, like 'so'""" | |
widgets = [ | |
'{}: '.format(folder), | |
progressbar.Percentage(), ' ', | |
progressbar.Bar("─"), ' ', | |
progressbar.AdaptiveETA(), ' ', | |
progressbar.DataSize(), ' ', | |
progressbar.AdaptiveTransferSpeed()] | |
pbar = progressbar.ProgressBar(widgets=widgets) | |
pbar.queue = Queue() | |
return pbar | |
# the progressbar handler for controlling the: | |
# start, progress and finish | |
@asyncio.coroutine | |
def progressbar_handler(pbar, downloads): | |
"""the progressbar and the number of download per folder""" | |
while pbar.queue.qsize() < downloads: | |
yield from asyncio.sleep(0.1) | |
pbar.max_value = sum([pbar.queue.get() for _ in range(pbar.queue.qsize())]) | |
pbar.start() | |
while pbar.value < pbar.max_value: | |
value = yield from EVENT_LOOP.run_in_executor(None, pbar.queue.get) | |
pbar.update(min(pbar.value + value, pbar.max_value)) | |
pbar.finish() | |
def download_folder(url_folder, folder, pbar): | |
"""download all the links whose can be files of a folder | |
:param url_folder: is a url folder like www.higuita.com.br/papers | |
:param folder: is a folder name like 'papers' | |
:param pbar: a instance of progressbar created by create_progressbar() | |
""" | |
create_folder(folder) | |
links = get_links(url_folder) | |
coroutines = create_coroutines(folder, links, pbar) | |
create_tasks(coroutines) | |
EVENT_LOOP.create_task(progressbar_handler(pbar, len(links))) | |
# create a folder to save the content | |
def create_folder(folder): | |
"""create a new directory of name 'folder' if not exists yet""" | |
if not path.exists(folder): | |
os.mkdir(folder) | |
print("created folder -> {}".format(folder)) | |
# create the coroutines to run later | |
def create_coroutines(base, links, pbar): | |
"""create the coroutines for download | |
:param base: url base www.higuita.com.br | |
:param links: the folder links like www.higuita.com.br/paper/paper1.pdf | |
:param pbar: a progressbar | |
""" | |
return [download_task(u, path.join(base, f), pbar) for u, f in links] | |
# create the tasks scheduled to run in the main loop event | |
def create_tasks(coroutines): | |
"""schedule all the coroutines in the main event_loop""" | |
[EVENT_LOOP.create_task(coro) for coro in coroutines] | |
# run the actual scheduled tasks | |
def run_tasks(): | |
"""run all the pendent tasks""" | |
tasks = asyncio.Task.all_tasks() | |
EVENT_LOOP.run_until_complete(asyncio.gather(*tasks)) | |
# main algorithm for download all folders | |
def main(): | |
"""main evaluation, getting all the folders""" | |
for folder in opts.folders: | |
url_folder = path.join(URL_BASE, folder) | |
pbar = create_progressbar(folder) | |
download_folder(url_folder, folder, pbar) | |
run_tasks() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment