Last active
December 28, 2015 02:09
-
-
Save jdmaturen/7425963 to your computer and use it in GitHub Desktop.
Vacuum up Crunchbase
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Get a bunch of Crunchbase data, but respect the API limits. | |
Author JD Maturen | |
Apache 2 License | |
""" | |
import logging | |
from random import random | |
import sys | |
from itertools import count | |
from Queue import Queue, Empty | |
from threading import RLock, Thread, Lock | |
from time import sleep | |
from time import time | |
import requests | |
from requests import Session | |
from requests.exceptions import RequestException | |
class Counter(object): | |
"""Thread safe math""" | |
def __init__(self, start=0): | |
self.current = start | |
self.rlock = RLock() | |
def increment(self, step=1): | |
with self.rlock: | |
self.current += step | |
return self.current | |
def get(self): | |
with self.rlock: | |
return self.current | |
def get_and_set(self, value=0): | |
with self.rlock: | |
prev = self.current | |
self.current = value | |
return prev | |
class RateLimiter(object): | |
"""""" | |
def __init__(self, rate): | |
self.rate = rate | |
self.count = 0 | |
self.last_time = time() | |
self.future_time = None | |
self.lock = Lock() | |
def limit(self): | |
"""Blocking limiter""" | |
with self.lock: | |
current = time() | |
delta = current - self.last_time | |
if delta > 1: | |
self.last_time = current | |
self.count = 1 | |
return | |
self.count += 1 | |
if self.count >= self.rate: | |
logging.debug("sleeping {0:.3f}s".format(1-delta)) | |
sleep(1 - delta) | |
def file_exists(filename): | |
exists = False | |
try: | |
with open(filename): | |
exists = True | |
except IOError, e: | |
pass | |
return exists | |
def get_crunchbase_data(q, s, api_key, company, filename, attempts, error_counter): | |
url = "http://api.crunchbase.com/v/1/company/{0:s}.js?api_key={1:s}".format(company, api_key) | |
logging.debug("Getting {0:s}".format(company)) | |
r = s.get(url) | |
if r.status_code != 200: | |
if 'x-mashery-error-code' in r.headers and r.headers['x-mashery-error-code'] == 'ERR_403_DEVELOPER_OVER_QPS': | |
retry_after = 3 | |
if 'retry-after' in r.headers: | |
retry_after = int(r.headers['retry-after']) | |
logging.debug("Going too fast, sleeping") | |
sleep(retry_after) | |
else: | |
logging.warn("Error getting {0:s}: {1:s}".format(company, str(r.headers))) | |
sleep(random() * 3) | |
if 400 <= r.status_code < 500: | |
error_counter.increment() | |
q.put((company, filename, attempts-1)) | |
return | |
with open(filename, 'w') as f: | |
f.write(r.text) | |
def process_queue(q, countdown, api_key, error_counter, rate_limiter): | |
s = Session() | |
while countdown.get() != 0: | |
try: | |
company, filename, attempts = q.get(True, 0.1) | |
if attempts > 0: | |
rate_limiter.limit() | |
get_crunchbase_data(q, s, api_key, company, filename, attempts, error_counter) | |
q.task_done() | |
except Empty: | |
pass | |
except RequestException, e: | |
logging.exception(e) | |
s = Session() | |
def main(): | |
"""docstring for main""" | |
rate_limit = 8 # API calls per second | |
output_dir = "companies/" | |
api_key = sys.argv[1] | |
logging.basicConfig(level=logging.INFO) | |
error_counter = Counter() | |
countdown = Counter(1) | |
get_rate_limiter = RateLimiter(rate_limit) | |
q = Queue() # unbounded queue so we don't deadlock on putting retries back in | |
thread_count = int(rate_limit * 0.75) | |
threads = [] | |
for _ in xrange(thread_count): | |
t = Thread(target=process_queue, args=(q, countdown, api_key, error_counter, get_rate_limiter)) | |
t.daemon = True | |
t.start() | |
threads.append(t) | |
for company in sys.stdin: | |
company = company.strip() | |
if error_counter.get() > 12: | |
raise Exception("too many errors") | |
filename = "{0:s}/{1:s}.json".format(output_dir, company) | |
if not file_exists(filename): | |
logging.info("fetching {0:s}".format(company)) | |
while q.qsize() > rate_limit / 2: | |
sleep(.1) | |
q.put((company, filename, 3)) | |
else: | |
logging.debug("Skipping {0:s}".format(company)) | |
q.join() | |
countdown.increment(-1) | |
for t in threads: | |
t.join() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment