Created
December 17, 2013 22:36
-
-
Save MattFaus/8013901 to your computer and use it in GitHub Desktop.
A high level overview of the pipeline job Khan Academy uses to download analytics data about each of its videos to perform more complex cross-video analysis.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class YouTubeQueryMasterPipeline(pipeline.Pipeline): | |
def run(self, user_ids): | |
"""Launches worker pipeline jobs to query the YouTube API. | |
Arguments: | |
user_ids: The user_ids of stored OAuth2 credentials. | |
""" | |
try: | |
for user_id in user_ids: | |
# Create the YouTube API client with the stored credentials | |
credential = YouTubeCredential.get_credential(user_id) | |
# Contact YouTube to get a list of video IDs for this user | |
video_ids = credential.query_videos() | |
# Create child pipelines in batches of 25 videos | |
for i in xrange(0, len(video_ids), 25): | |
yield YouTubeQuerySomeVideosPipeline( | |
user_id, video_ids[i:i + 25]) | |
except Exception: | |
# Don't let the pipeline library swallow your exceptions! | |
logging.error(traceback.format_exc()) | |
class YouTubeQuerySomeVideosPipeline(pipeline_util.SoftRetryPipeline): | |
"""Queries the YouTube API to download metrics for a batch of videos, and | |
writes that data into a YouTubeAnalytics entity for each one. | |
""" | |
def run(self, user_id, video_ids): | |
try: | |
# Create the YouTube API client with the stored credentials | |
credential = YouTubeCredential.get_credential(user_id) | |
for video_id in video_ids: | |
# Contact YouTube to get data about this video | |
result = credential.query_video_data(video_id) | |
# Store the result from the API in a datastore entity | |
yt_data = YouTubeAnalytics(result) | |
yt_data.put() | |
except Exception: | |
self.handle_exception() | |
class SoftRetryPipeline(pipeline.Pipeline): | |
"""Re-raises exceptions that would not cause a full abort, ignores others. | |
""" | |
PIPELINE_RETRY_LIMIT = 10 | |
def __init__(self, *args, **kwargs): | |
super(SoftRetryPipeline, self).__init__(*args, **kwargs) | |
# We have to specify backoff and retry parameters directly on the | |
# pipeline object so that the library will know how to reschedule it. | |
# The (nearly identical) mechanism available on the task queue itself | |
# doesn't work because the library creates a *new* task when it | |
# retries a failed one. | |
self.max_attempts = self.PIPELINE_RETRY_LIMIT | |
# Add some randomness in the backoffs for some staggering | |
# TODO: Parameterize for users to customize | |
self.backoff_seconds = random.randint(5, 20) | |
self.backoff_factor = (1.5 + (1 / random.randint(2, 4))) | |
def handle_exception(self): | |
"""Call this within an except block. Logs the current except, raises | |
it if it will not cause a full abort, or returns an empty dictionary | |
otherwise. | |
""" | |
msg = self.__class__.__name__ | |
msg += traceback.format_exc() | |
logging.error(msg) | |
if self.current_attempt <= self.PIPELINE_RETRY_LIMIT - 1: | |
# Raise the first few exceptions, to force the pipeline | |
# library to retry this task. | |
logging.warning("Raising exception on attempt %s/%s for pipeline " | |
"%s with parent %s", | |
self.current_attempt, self.max_attempts, self.pipeline_id, | |
self.root_pipeline_id) | |
raise | |
else: | |
# But don't let it hit max_attempts, or else the entire | |
# job to the root pipeline will be aborted | |
return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment