Created
October 4, 2015 20:27
-
-
Save kuk/d4ca86810ee81ba87f73 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import sys | |
from subprocess import Popen, PIPE | |
from multiprocessing import Pool | |
PDS_BASE = 'http://aws-publicdatasets.s3.amazonaws.com/' | |
WARC_PATHS = { | |
'2014-12': 'common-crawl/crawl-data/CC-MAIN-2014-52/warc.paths.gz', | |
'2015-07': 'common-crawl/crawl-data/CC-MAIN-2015-32/warc.paths.gz' | |
} | |
RUNNING = 'running' | |
FINISHED = 'finished' | |
GOOGLE = 'google' | |
def s3_url(path, base=PDS_BASE): | |
return base + path | |
def download_gz(url): | |
curl = Popen(['curl', '-s', url], stdout=PIPE) | |
gunzip = Popen(['gunzip'], stdin=curl.stdout, stdout=PIPE) | |
curl.stdout.close() # Allow curl to receive a SIGPIPE if gunzip exits. | |
for line in gunzip.stdout: | |
yield line | |
def download_warc_paths(path): | |
for line in download_gz(s3_url(path)): | |
yield line.rstrip('\n') | |
def download_warc(path): | |
return download_gz(s3_url(path)) | |
def grep_warc(warc): | |
url_line = None | |
date_line = None | |
for line in warc: | |
if line.startswith('WARC-Target-URI'): | |
url_line = line | |
elif line.startswith('WARC-Date'): | |
date_line = line | |
elif url_line is not None: | |
if 'www.google-analytics.com/analytics.js' in line:: | |
# Strip 'WARC-Target-URI: ' and '\r\n' | |
url = url_line[17:-2] | |
# Strip 'WARC-Date: ' and '\r\n' | |
date = date_line[11:-2] | |
yield GOOGLE, date, url | |
url_line = None | |
date_line = None | |
def chunk_status_path(chunk): | |
return '{0}.status'.format(chunk) | |
def update_download_progress(chunk, progress, status): | |
with open(chunk_status_path(chunk), 'w') as file: | |
file.write( | |
'{chunk}\t{progress}\t{status}\n' | |
.format( | |
chunk=chunk, | |
progress=progress, | |
status=status | |
) | |
) | |
def log_download_progress(chunk, warc, every=1000000): | |
for progress, line in enumerate(warc): | |
if progress % every == 0: | |
update_download_progress(chunk, progress, RUNNING) | |
yield line | |
update_download_progress(chunk, progress, FINISHED) | |
def chunk_results_path(chunk): | |
return '{0}.output'.format(chunk) | |
def write_grep_results(chunk, results): | |
with open(chunk_results_path(chunk), 'w', buffering=1) as file: | |
for provider, date, url in results: | |
file.write( | |
'{chunk}\t{provider}\t{date}\t{url}\n' | |
.format( | |
chunk=chunk, | |
provider=provider, | |
date=date, | |
url=url | |
) | |
) | |
def run_grep_warc(chunk, path): | |
warc = log_download_progress(chunk, download_warc(path)) | |
urls = grep_warc(warc) | |
write_grep_results(chunk, urls) | |
def run_parallel_greps(paths, start, stop): | |
stop = min(stop, len(paths)) | |
processes = stop - start | |
pool = Pool(processes=processes) | |
for chunk in range(start, stop): | |
path = paths[chunk] | |
pool.apply_async(run_grep_warc, (chunk, path)) | |
pool.close() | |
pool.join() | |
if __name__ == '__main__': | |
args = sys.argv | |
if len(args) != 4: | |
sys.exit('Usage: grep period start stop') | |
_, period, start, stop = args | |
start = int(start) | |
stop = int(stop) | |
path = WARC_PATHS[period] | |
paths = list(download_warc_paths(path)) | |
run_parallel_greps(paths, start, stop) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment