Skip to content

Instantly share code, notes, and snippets.

@rizsotto
Last active January 26, 2023 01:52
Show Gist options
  • Save rizsotto/1a536585fdfb23e090e3 to your computer and use it in GitHub Desktop.
Save rizsotto/1a536585fdfb23e090e3 to your computer and use it in GitHub Desktop.
warc files to .tar.gz
#!/usr/bin/env python
import sys
import re
import os
import os.path
import argparse
import logging
import json
import contextlib
import shutil
import tempfile
import urllib.parse
import boto3
from hanzo.warctools import WarcRecord
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--inputs', metavar='<file>', required=True)
parser.add_argument('--target', metavar='<bucket>', required=True)
parser.add_argument('--workdir', metavar='<directory>', required=True)
args = parser.parse_args()
logging_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(format='seed: %(levelname)s: %(message)s',
level=logging_level)
boto3.set_stream_logger('boto3.resources', logging.CRITICAL)
aws_s3 = boto3.resource('s3')
with open(args.inputs, 'r') as handle:
for line in handle:
entry = json.loads(line.strip())
logging.debug('going to extract %s', entry['results'])
with tempfile.TemporaryDirectory(dir=args.workdir) as temp_dir:
# download
warc_file = download(aws_s3, entry['results'], temp_dir)
extract_dir = os.path.join(temp_dir, 'warc-content')
os.mkdir(extract_dir)
# extract content
extract(warc_file, os.path.join(extract_dir, 'content'))
with open(os.path.join(extract_dir, 'seeds'), 'w') as handle:
handle.write(entry['seed'] + '\n')
# create archives
for ppid in entry['ppids']:
ppid_dir = os.path.join(temp_dir, ppid)
os.rename(extract_dir, ppid_dir)
archive = shutil.make_archive(ppid + '-content', 'gztar',
temp_dir, ppid)
os.rename(ppid_dir, extract_dir)
# upload
upload(aws_s3, archive, args.target)
os.remove(archive)
def upload(s3, local, bucket):
remote = os.path.basename(local)
with open(local, mode='rb') as handle:
s3.Bucket(bucket) \
.put_object(Key=remote,
ContentType='application/x-gzip',
Body=handle.read())
def download(s3, uri, directory):
bucket, name = re.match(r's3://(.*)/(.*)', uri).groups()
logging.debug('bucket: %s, object: %s', bucket, name)
response = s3.Object(bucket, name).get()
ihandle = response.get('Body')
result = os.path.join(directory, name)
with open(result, 'wb') as ohandle:
shutil.copyfileobj(ihandle, ohandle, 4096)
return result
def extract(warc_file, target_dir):
def map_to_local(url):
atoms = urllib.parse.urlsplit(url)
prefix = os.path.join(target_dir, atoms.netloc)
paths = [item for item in atoms.path.split('/') if item]
head = '' if not len(paths) else '/'.join(paths[:-1])
tail = 'index.html' if not len(paths) else paths[-1:][0]
tail += '?' + atoms.query if atoms.query else ''
tail += '#' + atoms.fragment if atoms.fragment else ''
tail = urllib.parse.quote_plus(tail)
return (os.path.join(prefix, head) if head else prefix,
tail[:255])
def makedirs(dirname):
if not dirname or os.path.isdir(dirname):
pass
elif os.path.isfile(dirname):
tempfile = dirname + '.dir'
os.rename(dirname, tempfile)
os.mkdir(dirname)
os.rename(tempfile, os.path.join(dirname, 'index.html'))
else:
makedirs(os.path.dirname(dirname))
if not os.path.exists(dirname):
os.mkdir(dirname)
def dump(record):
# create directory
dirname, filename = map_to_local(record.url.decode('utf-8').strip())
makedirs(dirname)
if os.path.isdir(os.path.join(dirname, filename)):
dirname = os.path.join(dirname, filename)
filename = 'index.html'
# create file
logging.debug('dumping into %s file %s', dirname, filename)
with open(os.path.join(dirname, filename), 'wb') as output:
_, content = record.content
output.write(content)
with contextlib.closing(WarcRecord.open_archive(filename=warc_file)) as fh:
for offset, record, error in fh.read_records(limit=None):
if record and record.type and record.type == b'response':
logging.info('will save url: %s', record.url.decode('utf-8'))
dump(record)
elif error:
logging.error('got error: %s', error)
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment