Skip to content

Instantly share code, notes, and snippets.

@metatoaster
Created May 8, 2018 15:01
Show Gist options
  • Save metatoaster/3c62eefb4cb315c743e38301d82b8ecc to your computer and use it in GitHub Desktop.
Save metatoaster/3c62eefb4cb315c743e38301d82b8ecc to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import sys
import zipfile
import json
import os
from io import BytesIO
from os.path import splitext
from math import ceil, floor, log
from time import perf_counter
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
units = ['b', 'kB', 'MB', 'GB', 'TB']
def stream_response(response, prefix='', status_stream=sys.stderr.write):
def readable(value, base=1000):
exp = floor(log(value, base)) if value > 1 else 0
unit = units[exp]
factor = base ** exp
return value / factor, unit
def status():
status_stream(
# '{time:.2f}'
# ' - '
'\r\x1b[K'
'{prefix}'
'{progress:.1f}%'
# '{read[0]:.2f} {read[1]}'
'/'
'{total[0]:.2f} {total[1]}'
' '
'({speed[0]:.2f} {speed[1]}/s)'
'\r'.format(
prefix=prefix,
time=time_current - time_start,
progress=bytes_percentage,
read=readable(bytes_read),
total=readable(bytes_total),
speed=readable(bytes_rate),
))
content_length = response.headers.get('Content-length')
if content_length is None:
return response.content
data = BytesIO()
bytes_read = bytes_percentage = bytes_updated = bytes_rate = 0
bytes_total = int(content_length)
time_current = time_updated = time_start = perf_counter()
status()
for chunk in response.iter_content(chunk_size=1024):
time_current = perf_counter()
bytes_read += len(chunk)
data.write(chunk)
if time_current > time_updated + 0.05 or bytes_read == bytes_total:
time_delta = time_current - time_updated
bytes_delta = bytes_read - bytes_updated
bytes_rate = bytes_delta / time_delta
bytes_updated = bytes_read
bytes_percentage = bytes_read / bytes_total * 100
time_updated = time_current
status()
status_stream('\r\x1b[K')
return data.getvalue()
class Extractor(object):
def __init__(self, agent, **kw):
self.session = requests.Session()
self.session.headers['User-Agent'] = agent
self.__dict__.update(kw)
def archive(self, datum):
title = datum[self.title]
nodes = datum[self.nodes]
target_fmt = '%%0%dd%%s' % ceil(log(len(nodes), 10))
name = [
v['name'] for v in datum[self.names]
if v['id'] == datum[self.id]
][0]
zname = self.export_fmt.format(name=name, **datum)
try:
zf = zipfile.ZipFile(zname, mode='w')
except Exception:
sys.stderr.write('[-] %s -> FAIL\n' % zname)
return
else:
sys.stderr.write('[+] %s -> open\n' % zname)
for idx, page in enumerate(nodes, 1):
url = urljoin(datum['@id'], self.route.format(page=page, **datum))
target = target_fmt % (idx, splitext(url)[-1])
prefix = target + ' -> '
sys.stderr.write(prefix + '\r')
try:
response = self.session.get(url, stream=True)
content = stream_response(response, prefix=prefix)
znfo = zipfile.ZipInfo(target)
znfo.file_size = len(content)
znfo.compress_type = zipfile.ZIP_DEFLATED
znfo.external_attr = 0o0777 << 16
zf.writestr(znfo, content)
except Exception as e:
sys.stderr.write(prefix + 'FAIL\n')
else:
sys.stderr.write(prefix + 'done\n')
zf.close()
sys.stderr.write('[+] %s -> done\n' % zname)
def extract(self, url):
try:
response = self.session.get(url)
if response.status_code >= 400:
sys.stderr.write(
'[-] %s -> HTTP %d\n' % (url, response.status_code))
return {}
soup = BeautifulSoup(response.text, 'html.parser')
datum = json.loads(
soup.find('script', {'data-type': self.data_type}).text)
except Exception:
sys.stderr.write('[-] %s -> FAIL\n' % (url))
return {}
datum['@id'] = url
return datum
def __call__(self, url):
datum = self.extract(url)
if datum:
self.archive(datum)
def single(*urls):
for url in urls:
extractor(url)
def batch_follow(url):
extracted = set()
while url not in extracted:
extracted.add(url)
datum = extractor.extract(url)
if not datum:
break
extractor.archive(datum)
if datum.get(extractor.follow):
url = extractor.follow_route.format(**datum)
def example_extractor():
# feed in value from environment variables.
return Extractor(
**{key.lower(): value for key, value in os.environ.items() if key in {
'AGENT',
'ID', 'TITLE', 'NODES', 'ROUTE', 'NAMES', 'EXPORT_FMT',
'DATA_TYPE', 'FOLLOW', 'FOLLOW_ROUTE',
}}
)
if __name__ == '__main__':
extractor = example_extractor()
if sys.argv[1] != '-f':
single(*sys.argv[1:])
else:
batch_follow(sys.argv[2])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment