Created
May 8, 2018 15:01
-
-
Save metatoaster/3c62eefb4cb315c743e38301d82b8ecc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import zipfile | |
import json | |
import os | |
from io import BytesIO | |
from os.path import splitext | |
from math import ceil, floor, log | |
from time import perf_counter | |
from urllib.parse import urljoin | |
import requests | |
from bs4 import BeautifulSoup | |
units = ['b', 'kB', 'MB', 'GB', 'TB'] | |
def stream_response(response, prefix='', status_stream=sys.stderr.write): | |
def readable(value, base=1000): | |
exp = floor(log(value, base)) if value > 1 else 0 | |
unit = units[exp] | |
factor = base ** exp | |
return value / factor, unit | |
def status(): | |
status_stream( | |
# '{time:.2f}' | |
# ' - ' | |
'\r\x1b[K' | |
'{prefix}' | |
'{progress:.1f}%' | |
# '{read[0]:.2f} {read[1]}' | |
'/' | |
'{total[0]:.2f} {total[1]}' | |
' ' | |
'({speed[0]:.2f} {speed[1]}/s)' | |
'\r'.format( | |
prefix=prefix, | |
time=time_current - time_start, | |
progress=bytes_percentage, | |
read=readable(bytes_read), | |
total=readable(bytes_total), | |
speed=readable(bytes_rate), | |
)) | |
content_length = response.headers.get('Content-length') | |
if content_length is None: | |
return response.content | |
data = BytesIO() | |
bytes_read = bytes_percentage = bytes_updated = bytes_rate = 0 | |
bytes_total = int(content_length) | |
time_current = time_updated = time_start = perf_counter() | |
status() | |
for chunk in response.iter_content(chunk_size=1024): | |
time_current = perf_counter() | |
bytes_read += len(chunk) | |
data.write(chunk) | |
if time_current > time_updated + 0.05 or bytes_read == bytes_total: | |
time_delta = time_current - time_updated | |
bytes_delta = bytes_read - bytes_updated | |
bytes_rate = bytes_delta / time_delta | |
bytes_updated = bytes_read | |
bytes_percentage = bytes_read / bytes_total * 100 | |
time_updated = time_current | |
status() | |
status_stream('\r\x1b[K') | |
return data.getvalue() | |
class Extractor(object): | |
def __init__(self, agent, **kw): | |
self.session = requests.Session() | |
self.session.headers['User-Agent'] = agent | |
self.__dict__.update(kw) | |
def archive(self, datum): | |
title = datum[self.title] | |
nodes = datum[self.nodes] | |
target_fmt = '%%0%dd%%s' % ceil(log(len(nodes), 10)) | |
name = [ | |
v['name'] for v in datum[self.names] | |
if v['id'] == datum[self.id] | |
][0] | |
zname = self.export_fmt.format(name=name, **datum) | |
try: | |
zf = zipfile.ZipFile(zname, mode='w') | |
except Exception: | |
sys.stderr.write('[-] %s -> FAIL\n' % zname) | |
return | |
else: | |
sys.stderr.write('[+] %s -> open\n' % zname) | |
for idx, page in enumerate(nodes, 1): | |
url = urljoin(datum['@id'], self.route.format(page=page, **datum)) | |
target = target_fmt % (idx, splitext(url)[-1]) | |
prefix = target + ' -> ' | |
sys.stderr.write(prefix + '\r') | |
try: | |
response = self.session.get(url, stream=True) | |
content = stream_response(response, prefix=prefix) | |
znfo = zipfile.ZipInfo(target) | |
znfo.file_size = len(content) | |
znfo.compress_type = zipfile.ZIP_DEFLATED | |
znfo.external_attr = 0o0777 << 16 | |
zf.writestr(znfo, content) | |
except Exception as e: | |
sys.stderr.write(prefix + 'FAIL\n') | |
else: | |
sys.stderr.write(prefix + 'done\n') | |
zf.close() | |
sys.stderr.write('[+] %s -> done\n' % zname) | |
def extract(self, url): | |
try: | |
response = self.session.get(url) | |
if response.status_code >= 400: | |
sys.stderr.write( | |
'[-] %s -> HTTP %d\n' % (url, response.status_code)) | |
return {} | |
soup = BeautifulSoup(response.text, 'html.parser') | |
datum = json.loads( | |
soup.find('script', {'data-type': self.data_type}).text) | |
except Exception: | |
sys.stderr.write('[-] %s -> FAIL\n' % (url)) | |
return {} | |
datum['@id'] = url | |
return datum | |
def __call__(self, url): | |
datum = self.extract(url) | |
if datum: | |
self.archive(datum) | |
def single(*urls): | |
for url in urls: | |
extractor(url) | |
def batch_follow(url): | |
extracted = set() | |
while url not in extracted: | |
extracted.add(url) | |
datum = extractor.extract(url) | |
if not datum: | |
break | |
extractor.archive(datum) | |
if datum.get(extractor.follow): | |
url = extractor.follow_route.format(**datum) | |
def example_extractor(): | |
# feed in value from environment variables. | |
return Extractor( | |
**{key.lower(): value for key, value in os.environ.items() if key in { | |
'AGENT', | |
'ID', 'TITLE', 'NODES', 'ROUTE', 'NAMES', 'EXPORT_FMT', | |
'DATA_TYPE', 'FOLLOW', 'FOLLOW_ROUTE', | |
}} | |
) | |
if __name__ == '__main__': | |
extractor = example_extractor() | |
if sys.argv[1] != '-f': | |
single(*sys.argv[1:]) | |
else: | |
batch_follow(sys.argv[2]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment