Last active
January 26, 2023 01:52
-
-
Save rizsotto/1a536585fdfb23e090e3 to your computer and use it in GitHub Desktop.
warc files to .tar.gz
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import re | |
import os | |
import os.path | |
import argparse | |
import logging | |
import json | |
import contextlib | |
import shutil | |
import tempfile | |
import urllib.parse | |
import boto3 | |
from hanzo.warctools import WarcRecord | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--verbose', '-v', action='count', default=0) | |
parser.add_argument('--inputs', metavar='<file>', required=True) | |
parser.add_argument('--target', metavar='<bucket>', required=True) | |
parser.add_argument('--workdir', metavar='<directory>', required=True) | |
args = parser.parse_args() | |
logging_level = logging.DEBUG if args.verbose else logging.INFO | |
logging.basicConfig(format='seed: %(levelname)s: %(message)s', | |
level=logging_level) | |
boto3.set_stream_logger('boto3.resources', logging.CRITICAL) | |
aws_s3 = boto3.resource('s3') | |
with open(args.inputs, 'r') as handle: | |
for line in handle: | |
entry = json.loads(line.strip()) | |
logging.debug('going to extract %s', entry['results']) | |
with tempfile.TemporaryDirectory(dir=args.workdir) as temp_dir: | |
# download | |
warc_file = download(aws_s3, entry['results'], temp_dir) | |
extract_dir = os.path.join(temp_dir, 'warc-content') | |
os.mkdir(extract_dir) | |
# extract content | |
extract(warc_file, os.path.join(extract_dir, 'content')) | |
with open(os.path.join(extract_dir, 'seeds'), 'w') as handle: | |
handle.write(entry['seed'] + '\n') | |
# create archives | |
for ppid in entry['ppids']: | |
ppid_dir = os.path.join(temp_dir, ppid) | |
os.rename(extract_dir, ppid_dir) | |
archive = shutil.make_archive(ppid + '-content', 'gztar', | |
temp_dir, ppid) | |
os.rename(ppid_dir, extract_dir) | |
# upload | |
upload(aws_s3, archive, args.target) | |
os.remove(archive) | |
def upload(s3, local, bucket): | |
remote = os.path.basename(local) | |
with open(local, mode='rb') as handle: | |
s3.Bucket(bucket) \ | |
.put_object(Key=remote, | |
ContentType='application/x-gzip', | |
Body=handle.read()) | |
def download(s3, uri, directory): | |
bucket, name = re.match(r's3://(.*)/(.*)', uri).groups() | |
logging.debug('bucket: %s, object: %s', bucket, name) | |
response = s3.Object(bucket, name).get() | |
ihandle = response.get('Body') | |
result = os.path.join(directory, name) | |
with open(result, 'wb') as ohandle: | |
shutil.copyfileobj(ihandle, ohandle, 4096) | |
return result | |
def extract(warc_file, target_dir): | |
def map_to_local(url): | |
atoms = urllib.parse.urlsplit(url) | |
prefix = os.path.join(target_dir, atoms.netloc) | |
paths = [item for item in atoms.path.split('/') if item] | |
head = '' if not len(paths) else '/'.join(paths[:-1]) | |
tail = 'index.html' if not len(paths) else paths[-1:][0] | |
tail += '?' + atoms.query if atoms.query else '' | |
tail += '#' + atoms.fragment if atoms.fragment else '' | |
tail = urllib.parse.quote_plus(tail) | |
return (os.path.join(prefix, head) if head else prefix, | |
tail[:255]) | |
def makedirs(dirname): | |
if not dirname or os.path.isdir(dirname): | |
pass | |
elif os.path.isfile(dirname): | |
tempfile = dirname + '.dir' | |
os.rename(dirname, tempfile) | |
os.mkdir(dirname) | |
os.rename(tempfile, os.path.join(dirname, 'index.html')) | |
else: | |
makedirs(os.path.dirname(dirname)) | |
if not os.path.exists(dirname): | |
os.mkdir(dirname) | |
def dump(record): | |
# create directory | |
dirname, filename = map_to_local(record.url.decode('utf-8').strip()) | |
makedirs(dirname) | |
if os.path.isdir(os.path.join(dirname, filename)): | |
dirname = os.path.join(dirname, filename) | |
filename = 'index.html' | |
# create file | |
logging.debug('dumping into %s file %s', dirname, filename) | |
with open(os.path.join(dirname, filename), 'wb') as output: | |
_, content = record.content | |
output.write(content) | |
with contextlib.closing(WarcRecord.open_archive(filename=warc_file)) as fh: | |
for offset, record, error in fh.read_records(limit=None): | |
if record and record.type and record.type == b'response': | |
logging.info('will save url: %s', record.url.decode('utf-8')) | |
dump(record) | |
elif error: | |
logging.error('got error: %s', error) | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment