Created
August 7, 2019 02:55
-
-
Save cameronneylon/43b542f7ddc8746484692875c6c99773 to your computer and use it in GitHub Desktop.
COKI code for pulling for CED
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import datetime | |
from google.cloud import firestore | |
from google.cloud import storage | |
import time | |
import logging | |
db = firestore.Client() | |
def trigger(event, context): | |
# Get configuration and aquire the lock | |
config_ref = db.collection("configuration").document("crossref-event") | |
transaction = db.transaction() | |
configuration, locked = atomic_lock(transaction, config_ref) | |
print(configuration, locked) | |
if not locked: | |
# Pull values | |
next_cursor = configuration["next_cursor"] | |
next_itteration_number = configuration["next_itteration_number"] | |
destination_bucket_name = configuration["destination_bucket_name"] | |
email = configuration["email"] | |
results, filename, next_cursor, next_itteration_number = run_itteration(email, next_cursor, next_itteration_number) | |
write_results(destination_bucket_name, filename, results) | |
uncomplete = True | |
backoff = 3 | |
attempts = 0 | |
while(uncomplete): | |
try: | |
# Release lock | |
db.collection("configuration").document("crossref-event").update({ | |
u"locked": False, | |
u"next_cursor": next_cursor, | |
u"next_itteration_number": next_itteration_number | |
}) | |
uncomplete = False | |
except: | |
# Only try 3 times before failing | |
if attempts == 3: | |
logging.error(f"Update to firestore failed 3 times [successful fetch]") | |
raise Exception | |
logging.warning(f"Update to firestore failed, trying again in {backoff} seconds [successful fetch]") | |
attempts += 1 | |
# Wait for [backoff] seconds | |
time.sleep(backoff) | |
backoff += 3 | |
def fetch_events(email, cursor, num_rows): | |
if cursor == "start": | |
url = f"https://api.eventdata.crossref.org/v1/events?mailto={email}&rows={num_rows}" | |
else: | |
url = f"https://api.eventdata.crossref.org/v1/events?mailto={email}&rows={num_rows}&cursor={cursor}" | |
# Make the request | |
r = requests.get(url) | |
# Extract the next cursor from the response | |
output = r.text[0:200] | |
start = output.find("next-cursor") + 14 | |
end = output.find("total") - 3 | |
next_cursor = output[start:end] | |
return r.text, next_cursor | |
def run_itteration(email, cursor, itteration_number): | |
timestamp = str(datetime.datetime.now(datetime.timezone.utc)) | |
next_itteration_number = itteration_number + 1 | |
start = time.monotonic() | |
results, next_cursor = fetch_events(email, cursor, 10000) | |
end = time.monotonic() | |
request_duration = end - start | |
# Determine folder structure - bucketed into around ~1 million records, and ~100 files per folder | |
folder = int(itteration_number / 100) | |
filename = f"{folder}/{itteration_number}_{timestamp}_{cursor}_{next_cursor}_{request_duration}.json" | |
return results, filename, next_cursor, next_itteration_number | |
def write_results(destination_bucket_name, destination_blob_name, results): | |
bucket = storage.Client().get_bucket(destination_bucket_name) | |
bucket.blob(destination_blob_name).upload_from_string(results, content_type='text/plain') | |
@firestore.transactional | |
def atomic_lock(transaction, config_ref): | |
configuration = config_ref.get(transaction=transaction).to_dict() | |
if configuration["locked"] is False: | |
# Aquire lock | |
transaction.update(db.collection("configuration").document("crossref-event"), { | |
u"locked": True | |
}) | |
return configuration, False | |
else: | |
return configuration, True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment