pip install requests cdx_toolkit tqdm pyOpenSSL
mkdir html
python fetch.py 'https://open.spotify.com/*'
Last active
November 19, 2021 05:50
-
-
Save sloev/db4c45c8e3ac4aee4bb267eb5fcfbb97 to your computer and use it in GitHub Desktop.
common crawl python cdx_toolkit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| try: | |
| import requests | |
| import cdx_toolkit | |
| import tqdm | |
| import OpenSSL | |
| from lxml import html | |
| except: | |
| print("you need to do some installs first:\npip install requests cdx_toolkit tqdm pyOpenSSL lxml") | |
| exit(1) | |
| def retried_get( | |
| *args, retries=5, **kwargs, | |
| ): | |
| try: | |
| return requests.get(*args, **kwargs, verify=False) | |
| except OpenSSL.SSL.SysCallError: | |
| if not retries: | |
| raise | |
| except requests.exceptions.ConnectionError: | |
| if not retries: | |
| raise | |
| except requests.exceptions.SSLError: | |
| if not retries: | |
| raise | |
| except requests.exceptions.ChunkedEncodingError: | |
| if not retries: | |
| raise | |
| sleep_s = 6 - retries | |
| log(f"got ssl error, sleeping: {sleep_s}s (retries left: {retries})") | |
| time.sleep(sleep_s) | |
| retries -= 1 | |
| return get(*args, retries=retries, **kwargs) | |
| import gzip | |
| import json | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| import logging | |
| logging.basicConfig(level=logging.WARNING, format='%(message)s') | |
| log = logging.warning | |
| import time | |
| import sys | |
| from io import BytesIO | |
| def extract(output_filename, filename, offset, length, url, **trash): | |
| offset, length = int(offset), int(length) | |
| offset_end = offset + length - 1 | |
| prefix = "https://commoncrawl.s3.amazonaws.com/" | |
| resp = retried_get( | |
| prefix + filename, | |
| headers={"Range": "bytes={}-{}".format(offset, offset_end)}, | |
| ) | |
| raw_data = BytesIO(resp.content) | |
| f = gzip.GzipFile(fileobj=raw_data) | |
| data = f.read().decode() | |
| data = data.strip().split("\r\n\r\n") | |
| if len(data) != 3: | |
| return None | |
| warc, header, response = data | |
| with open(output_filename, "w") as f: | |
| f.write(f"<!-- {url} -->\n{response}") | |
| return True | |
| if __name__ == "__main__": | |
| try: | |
| url_prefix = sys.argv[1] | |
| except: | |
| log("usage:\npython fetch.py 'https://www.google.com/*'\nremeber the asterisk!!!") | |
| exit(1) | |
| log("initializing") | |
| cdx = cdx_toolkit.CDXFetcher(source="cc") | |
| TOTAL_RESULTS = cdx.get_size_estimate(url_prefix) | |
| with tqdm.tqdm(total=TOTAL_RESULTS, desc="initializing cdx iterator") as pbar: | |
| for index, obj in enumerate(cdx.iter(url_prefix, filter=["status:200"])): | |
| result = extract(output_filename="html/{:010d}.html".format(index),**obj.data) | |
| msg_post_fix = " " | |
| if result is None: | |
| msg_post_fix = "no result" | |
| url_for_message = obj.data['url'] | |
| try: | |
| url_for_message = url[len(url_prefix): len(url_prefix)+20] | |
| except: | |
| pass | |
| pbar.desc = f"{url_for_message}...: {msg_post_fix}" | |
| pbar.update(1) | |
| pbar.refresh() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
cdx_toolkit has a function that fetches the html as bytes, it is
obj.content-- I have improved the documentation to make this feature more obvious.Thank you for sharing this example!