Skip to content

Instantly share code, notes, and snippets.

@1271
Last active April 27, 2018 10:33
Show Gist options
  • Save 1271/3b351a55833b52fa201af29a3e4c242f to your computer and use it in GitHub Desktop.
Save 1271/3b351a55833b52fa201af29a3e4c242f to your computer and use it in GitHub Desktop.
abookru.com tracks downloader
import logging
import re
from argparse import ArgumentParser
from base64 import b64decode
from datetime import datetime
from multiprocessing import cpu_count
from os import makedirs
from os.path import basename, isdir, isfile, getsize, join
from sys import stderr
from threading import Thread
from time import sleep
try:
from urllib import unquote_plus
except BaseException:
from urllib.parse import unquote_plus
import requests
from lxml.html import document_fromstring
from progressbar import ProgressBar
class Book:
_max_threads = cpu_count()
_log = None
__progress = None
__progress_n = None
__start_time = None
__to_run = []
__names = []
__book = ''
__url = ''
def __download_file(self, url, target):
"""
Downloading one file
:param url: string
:param target: string
:return:
"""
if isfile(target) and getsize(target) > (2 << 15):
return
n = 9
while n:
try:
with open(target, 'wb') as w:
self.log_info('Downloading {}'.format(target))
result = requests.get(url)
if result.status_code == 200:
self.log_info('Downloaded {}'.format(target))
w.write(result.content)
return
except Exception:
sleep(1)
n -= 1
msg = 'ConnectionAbortedError'
self.log_error(msg)
print(msg, file=stderr)
def _download(self, files):
"""
Aggregate urls, run downloading
:param files: list
:return:
"""
self.__start_time = datetime.now()
msg = 'Downloading {} start'.format(self.__book)
self.log_info(msg)
print(msg)
self.__progress = ProgressBar()
self.__progress.init()
self.__progress.start(len(files))
self.__progress_n = 0
target_dir = join('books', '{}'.format(self.__book))
if not isdir(target_dir):
self.log_info('Make directory {}'.format(target_dir))
makedirs(target_dir)
for n, f in enumerate(files):
name = '{}/{:0>3}.{}'.format(
target_dir,
f[0],
f[1]
)
t = Thread(
target=self.__download_file,
args=(f[2], name)
)
t.start()
self.__to_run.append(t)
if n and n % self._max_threads == 0:
self._run_thread()
self.__progress.finish()
print('')
self.log_info('Downloading {} stop.\nTime: {}\n\n'.format(
self.__book,
(datetime.now() - self.__start_time)
))
def _progress_plus(self):
self.__progress.update(self.__progress_n)
if self.__progress.max_value > self.__progress_n:
self.__progress_n += 1
def _run_thread(self):
"""
:return:
"""
for f in self.__to_run:
f.join()
self._progress_plus()
self.__to_run = []
def _urls(self):
"""
Get tracks urls
:return:
"""
data = requests.get(self.__url).text
playlist = re.search(
r'PLAYLISTS\.MI.+?=\s(\[[^<]+\]);',
data
)
if not playlist:
return []
playlist = playlist.group(1)
playlist = re.findall(
r'name:\s*"(.+?)",.*formats:.*?,\s*(\w+):\s*"(.+?)"',
playlist,
re.M
)
return [(i[0], i[1], b64decode(i[2])) for i in playlist]
def _get_tag_urls(self, url):
"""
:param url:
:return:
"""
tag = re.search('/tag/([^/]+)', url)
self.log_info('Parse tag {}'.format(
tag.group(1)
))
content = requests.get(url).text
content = re.sub(r'<!--.+\n--!>', '', content) # patch
html = document_fromstring(content)
items = html.cssselect('#content .hentry li a')
return [unquote_plus(i.get('href')) for i in items]
def _parse_input_urls(self, urls):
"""
:param urls:
:return:
"""
_re = re.compile(r'https?://abookru\.com/.')
result = []
for url in urls:
if not _re.search(url):
continue
if ~url.find('/tag/'):
result += self._get_tag_urls(url)
else:
result.append(url)
return result
# def log_debug(self, msg):
# self.__log(msg, logging.DEBUG)
def log_info(self, msg):
self.__log(msg, 'info')
def log_error(self, msg):
self.__log(msg, 'error')
def __log(self, msg, _type):
date = datetime.now()
date = date.strftime(r'%Y-%m-%d %H:%M:%S')
getattr(self._log, _type)('{}\n{}\n\n'.format(
date,
msg
))
def main(self, urls):
"""
:param urls: list
:return:
"""
self._log = logging.Logger('log')
date = datetime.now()
self._log.addHandler(logging.FileHandler('{}.log'.format(
date.strftime('%Y-%m-%d_%H-%M-%S')
)))
for url in self._parse_input_urls(urls):
url = url.rstrip('/')
self.__url = url
self.__book = basename(url)
playlist = self._urls()
self._download(playlist)
self.log_info('')
if __name__ == '__main__':
args_parser = ArgumentParser()
args_parser.add_argument('url', metavar='url', type=str,
help='Downloaded urls', default='',
nargs='+')
args = args_parser.parse_args()
Book().main(args.url)
requests
progressbar2
lxml
urllib
@1271
Copy link
Author

1271 commented Apr 25, 2018

Usage:

pip install -r requirements.txt

python abookru_com.py http://abookru.com/лабиринт-отражений http://abookru.com/фальшивые-зеркала
python abookru_com.py http://abookru.com/tag/юрий-нагибин http://abookru.com/лабиринт-отражений

Requirements:
requests
progressbar2
python>=3.4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment