Created
September 25, 2012 22:28
-
-
Save mikemccabe/3784845 to your computer and use it in GitHub Desktop.
Parallel archive.org metadata fetching using python and gevent
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This demonstrates doing multiple metadata fetches in parallel. | |
# It seems to be fast enough that the json decoding cost becomes | |
# a significant proportion of the execution time. | |
# It requires gevent; see http://www.gevent.org/intro.html#installation | |
# This is callable from the command line; call with --help for a summary. | |
# If you use it is a library, the main entry point is | |
# metadata_record_iterator(); see main() for an example. | |
import gevent | |
import urllib | |
import json | |
import sys | |
import random | |
import os | |
from itertools import islice | |
from gevent import queue as g_queue | |
from gevent import monkey | |
monkey.patch_all() | |
# Globals! | |
done_queueing_input = False | |
queued_count = 0 | |
hosts = ( | |
"www16", | |
"www17", | |
"www18", | |
"www19", | |
"www20", | |
"www21", | |
"www22", | |
"www23", | |
"www24", | |
"www25", | |
"www26", | |
"www27", | |
"www28", | |
"www29", | |
"www30", | |
"www31" | |
) | |
skips = [] | |
def md_getter(input_queue, json_queue, recache): | |
while True: | |
i, id = input_queue.get() | |
host = hosts[random.randrange(len(hosts))] | |
while host in skips: | |
host = hosts[random.randrange(len(hosts))] | |
if recache: | |
recache_str = '?reCache=1' | |
else: | |
recache_str = '' | |
try: | |
j = get_url("http://%s.us.archive.org/metadata/%s%s" | |
% (host, id, recache_str)) | |
if len(j) < 100: | |
print >> sys.stderr, "got short string " + str(j) + " for " + id + " - error?" | |
json_queue.put((i, id, j)) | |
except IOError: | |
print >> sys.stderr, host + " failed" | |
skips.append(host) | |
input_queue.put((i, id)) | |
finally: | |
input_queue.task_done() | |
def queue_input(ids, input_queue): | |
global queued_count | |
global done_queueing_input | |
for i, id in enumerate(ids): | |
id = id.strip() | |
input_queue.put((i, id)) | |
queued_count += 1 | |
done_queueing_input = True | |
def get_url(url): | |
f = urllib.urlopen(url) | |
if f.getcode() != 200: | |
print >> sys.stderr, "get failed for " + url | |
c = '{}' | |
else: | |
c = f.read() | |
f.close() | |
return c | |
def metadata_record_iterator(ids, workers=20, sorted=False, recache=False): | |
input_queue = g_queue.JoinableQueue(1000) | |
json_queue = g_queue.Queue(1000) | |
gevent.spawn(queue_input, ids, input_queue) | |
for i in range(workers): | |
gevent.spawn(md_getter, input_queue, json_queue, recache) | |
def metadata_iterator_helper(): | |
got_count = 0 | |
while True: | |
if done_queueing_input and got_count == queued_count: | |
break | |
yield json_queue.get() | |
got_count += 1 | |
def sorted_iterator(results): | |
current_i = 0 | |
pq = g_queue.PriorityQueue() | |
results_remain = True | |
while True: | |
if done_queueing_input and current_i == queued_count: | |
break | |
while True: | |
if results_remain: | |
try: | |
tup = results.next() | |
pq.put(tup) | |
except StopIteration: | |
results_remain = False | |
tup = pq.get() | |
i, _, _ = tup | |
if i == current_i: | |
yield tup | |
current_i += 1 | |
break | |
else: | |
pq.put(tup) | |
if sorted: | |
return sorted_iterator(metadata_iterator_helper()) | |
else: | |
return metadata_iterator_helper() | |
def info_callback(i, id, md_json): | |
o = json.loads(md_json) | |
print "%s %s %s %s %s" % (i, id, o.get('server', ''), | |
o.get('dir', ''), o.get('item_size', '')) | |
def printing_callback(i, id, md_json): | |
print md_json | |
def idonly_callback(i, id, md_json): | |
print "%s %s %s" % (i, id, len(md_json)) | |
def printing_with_ids_callback(i, id, md_json): | |
print str(i), id, md_json | |
def main(argv): | |
import optparse | |
parser = optparse.OptionParser(usage='usage: %prog [options] file_or_id_or_-_for_stdin', | |
version='%prog 0.1', | |
description='get archive metadata for archive ids. Prints JSON to stdout by default.') | |
parser.add_option('--start', | |
action='store', | |
type='int', | |
default=0, | |
help='Index of first id to fetch') | |
parser.add_option('--count', | |
action='store', | |
type='int', | |
default=0, | |
help='Count of ids to fetch') | |
parser.add_option('--workers', | |
action='store', | |
type='int', | |
default=20, | |
help='How many metadata fetch workers to spawn') | |
parser.add_option('--recache', | |
action='store_true', | |
default=False, | |
help='Recache when fetching') | |
parser.add_option('--idonly', | |
action='store_true', | |
default=False, | |
help='Print "index id len(json) - for testing"') | |
parser.add_option('--withids', | |
action='store_true', | |
default=False, | |
help='Print "index id json"') | |
parser.add_option('--altformat', | |
action='store_true', | |
default=False, | |
help='Print "index id server dir item_size": parses json') | |
parser.add_option('--ujson', | |
action='store_true', | |
default=False, | |
help='use ujson instead of json') | |
parser.add_option('--sorted', | |
action='store_true', | |
default=False, | |
help='Produce results in sorted order') | |
opts, args = parser.parse_args(argv) | |
if len(args) != 1: | |
parser.print_usage() | |
sys.exit(1) | |
if os.path.exists(args[0]): | |
ids = open(args[0]) | |
else: | |
if args[0] == '-': | |
ids = sys.stdin | |
else: | |
ids = [args[0]] | |
parser.destroy() | |
if opts.ujson: | |
import ujson | |
global json | |
json = ujson | |
callback = printing_callback | |
if opts.altformat: | |
callback = info_callback | |
if opts.idonly: | |
callback = idonly_callback | |
if opts.withids: | |
callback = printing_with_ids_callback | |
stop = opts.start + opts.count if opts.count is not 0 else None | |
ids = islice(ids, opts.start, stop) | |
results = metadata_record_iterator(ids, opts.workers, | |
opts.sorted, opts.recache) | |
for i, id, md_json in results: | |
callback(i, id, md_json) | |
if __name__ == '__main__': | |
sys.exit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment