Skip to content

Instantly share code, notes, and snippets.

@sloev
Last active November 19, 2021 05:50
Show Gist options
  • Select an option

  • Save sloev/db4c45c8e3ac4aee4bb267eb5fcfbb97 to your computer and use it in GitHub Desktop.

Select an option

Save sloev/db4c45c8e3ac4aee4bb267eb5fcfbb97 to your computer and use it in GitHub Desktop.
common crawl python cdx_toolkit
try:
import requests
import cdx_toolkit
import tqdm
import OpenSSL
from lxml import html
except:
print("you need to do some installs first:\npip install requests cdx_toolkit tqdm pyOpenSSL lxml")
exit(1)
def retried_get(
*args, retries=5, **kwargs,
):
try:
return requests.get(*args, **kwargs, verify=False)
except OpenSSL.SSL.SysCallError:
if not retries:
raise
except requests.exceptions.ConnectionError:
if not retries:
raise
except requests.exceptions.SSLError:
if not retries:
raise
except requests.exceptions.ChunkedEncodingError:
if not retries:
raise
sleep_s = 6 - retries
log(f"got ssl error, sleeping: {sleep_s}s (retries left: {retries})")
time.sleep(sleep_s)
retries -= 1
return get(*args, retries=retries, **kwargs)
import gzip
import json
import warnings
warnings.filterwarnings("ignore")
import logging
logging.basicConfig(level=logging.WARNING, format='%(message)s')
log = logging.warning
import time
import sys
from io import BytesIO
def extract(output_filename, filename, offset, length, url, **trash):
offset, length = int(offset), int(length)
offset_end = offset + length - 1
prefix = "https://commoncrawl.s3.amazonaws.com/"
resp = retried_get(
prefix + filename,
headers={"Range": "bytes={}-{}".format(offset, offset_end)},
)
raw_data = BytesIO(resp.content)
f = gzip.GzipFile(fileobj=raw_data)
data = f.read().decode()
data = data.strip().split("\r\n\r\n")
if len(data) != 3:
return None
warc, header, response = data
with open(output_filename, "w") as f:
f.write(f"<!-- {url} -->\n{response}")
return True
if __name__ == "__main__":
try:
url_prefix = sys.argv[1]
except:
log("usage:\npython fetch.py 'https://www.google.com/*'\nremeber the asterisk!!!")
exit(1)
log("initializing")
cdx = cdx_toolkit.CDXFetcher(source="cc")
TOTAL_RESULTS = cdx.get_size_estimate(url_prefix)
with tqdm.tqdm(total=TOTAL_RESULTS, desc="initializing cdx iterator") as pbar:
for index, obj in enumerate(cdx.iter(url_prefix, filter=["status:200"])):
result = extract(output_filename="html/{:010d}.html".format(index),**obj.data)
msg_post_fix = " "
if result is None:
msg_post_fix = "no result"
url_for_message = obj.data['url']
try:
url_for_message = url[len(url_prefix): len(url_prefix)+20]
except:
pass
pbar.desc = f"{url_for_message}...: {msg_post_fix}"
pbar.update(1)
pbar.refresh()

use common crawl index from python in a sleek way!

usage

pip install requests cdx_toolkit tqdm pyOpenSSL 

mkdir html

python fetch.py 'https://open.spotify.com/*'
@wumpus
Copy link

wumpus commented Aug 31, 2020

cdx_toolkit has a function that fetches the html as bytes, it is obj.content -- I have improved the documentation to make this feature more obvious.

Thank you for sharing this example!

@sloev
Copy link
Author

sloev commented Sep 1, 2020

@wumpus
thx for feedback and a great library!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment