Last active
May 26, 2022 02:05
-
-
Save alangpierce/f0ad63643b446a4f84ad to your computer and use it in GitHub Desktop.
BigBingo (as of early July 2014)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Alternative deciders that can be used in BigBingo experiments. | |
An alternative decider is a function that determines which user gets which | |
alternative in a BigBingo experiment. It takes as arguments the experiment | |
itself and the bingo ID and returns the chosen alternative. See the Experiment | |
class in bigbingo/config.py for more info. | |
The alternative decider should take the following arguments: | |
experiment: A config.Experiment object containing all information about | |
the experiment (most notably, an OrderedDict mapping alternative | |
name to weight). | |
bingo_id: A string with the ID for the given user. | |
It should return one of the experiment alternative names, or None if the user | |
should not participate in the experiment. It should be a deterministic | |
function of experiment and bingo_id. | |
""" | |
import hashlib | |
import intl.request | |
def legacy(experiment, bingo_id): | |
"""An alternative decider function that emulates GAE/Bingo. | |
This function should match modulo_choose in gae_bingo/gae_bingo.py. This | |
decider should not be used for new experiments; it only exists for | |
experiments that were ported from GAE/Bingo to BigBingo and are still | |
running. | |
""" | |
alternatives_weight = sum(experiment.alternative_weights.itervalues()) | |
sig = hashlib.md5( | |
experiment.legacy_hashable_name + bingo_id).hexdigest() | |
sig_num = int(sig, base=16) | |
index_weight = sig_num % alternatives_weight | |
current_weight = alternatives_weight | |
alternative_data = [ | |
(index, alt, weight) | |
for (index, (alt, weight)) | |
in enumerate(experiment.alternative_weights.iteritems())] | |
for _, alt, weight in sorted( | |
alternative_data, | |
key=lambda (alt_index, _, alt_weight): (alt_weight, alt_index), | |
reverse=True): | |
current_weight -= weight | |
if index_weight >= current_weight: | |
return alt | |
assert False, "Didn't find a suitable alternative." | |
def english_only(experiment, bingo_id): | |
"""Test the experiment on a (deterministic) pseudorandom set of | |
English-speaking users. | |
""" | |
if intl.request.locale_for_mo() == 'en': | |
return by_weights(experiment, bingo_id) | |
else: | |
return None | |
def by_weights(experiment, bingo_id): | |
"""Test the experiment on a (deterministic) pseudorandom set of users.""" | |
total_weight = sum(experiment.alternative_weights.itervalues()) | |
weight_index = hash_on_interval(experiment.id + bingo_id) * total_weight | |
current_index = 0 | |
for alternative, weight in experiment.alternative_weights.iteritems(): | |
if current_index <= weight_index < current_index + weight: | |
return alternative | |
current_index += weight | |
assert False, 'Hash was out of bounds when computing BigBingo alternative.' | |
def limited_to_range(decider, range_start, range_end): | |
"""Return a decider that only allows a predictable set of users. | |
This is useful for making experiments that are mutually exclusive with each | |
other and re-using the participants of one experiment in a later | |
experiment, since the assignment of bingo_id to location in the [0, 1) | |
range is experiment-independent. | |
Example: limited_to_range(english_only, 0.2, 0.4) returns an alternative | |
decider that only allows a specific 20% of users to be in the experiment, | |
and of those, only allows english users. If you add another experiment with | |
decider limited_to_range(by_weights, 0.4, 0.5), English and non-English | |
users will be included and the experiment will be disjoint with the first | |
experiment. | |
Arguments: | |
decider: The alternative decider to use if the user is in the | |
experiment. | |
range_start: The inclusive start of the range, between 0 and 1. | |
range_end: The exclusive end of the range, between 0 and 1. | |
""" | |
assert 0 <= range_start < range_end <= 1 | |
def wrapper_decider(experiment, bingo_id): | |
if range_start <= hash_on_interval(bingo_id) < range_end: | |
return decider(experiment, bingo_id) | |
else: | |
return None | |
return wrapper_decider | |
def hash_on_interval(value): | |
"""Given a string, return a hash of the string in the interval [0, 1). | |
For example, you can multiply the result by n to get a deterministic hash | |
that's a float value between 0 (inclusive) and n (exclusive). | |
The common way to use this is to call hash_on_interval(base + bingo_id), in | |
which case the right way to think about it is that each bingo_id for a base | |
gets a random (but deterministic) point on the "hash space", the full set | |
of hash space mappings is re-randomzied for each base. | |
""" | |
# Using a cryptographic hash function is overkill here, but it looks like | |
# Python doesn't have a good non-cryptographic hash function that's stable | |
# across versions, and MD5 is still plenty fast for our use case. | |
hash_str = hashlib.md5(value).hexdigest() | |
hash_uint32 = int(hash_str, 16) & 0xFFFFFFFF | |
return float(hash_uint32) / (1 << 32) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Snapshot of Khan Academy's BigBingo A/B testing framework and related code. | |
Here's a basic overview: | |
-summarize.py is the most interesting file. It contains all stages of the | |
summarize task, as well as the publish and archive steps that happen at the | |
end. | |
-bq_pipelines.py contains lots of useful pipelines for interacting with | |
BigQuery. QueryToTableBatchPipeline can run many simultaneous queries, and will | |
properly handle all batching and retry logic. | |
-config.py is where all experiment configuraiton lives. For this Gist, I | |
replaced the big list of active and archived experiments with simple examples. | |
The summarize task depends heavily on the App Engine Pipeline API | |
( https://code.google.com/p/appengine-pipeline/ ), more specifically, on KA's | |
fork of it here: https://github.com/Khan/appengine-mapreduce . |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""API for interacting with BigBingo.""" | |
from __future__ import absolute_import | |
import logging | |
from bigbingo import config | |
from bigbingo import log | |
import cookie_util | |
def ab_test(experiment_id, bingo_id): | |
"""Participates the given user in the given experiment. | |
Logs a participation event and determines which alternative to use for this | |
user. If this is the first participation event for this experiment and | |
user, the time is used as the user's participation time for the experiment | |
and conversions start being tracked for the user; otherwise, the BigBingo | |
summarizer will ignore the participation event. | |
Arguments: | |
experiment_id: A string name of the ID of the experiment. | |
bingo_id: A string identity for the user that should participate in the | |
experiment. | |
Returns: | |
A string name of the alternative for this user in the experiment. | |
""" | |
experiment = config.get_active_experiment_by_id(experiment_id) | |
if not isinstance(bingo_id, basestring): | |
logging.error( | |
'When calling bigbingo.ab_test on the experiment %s, the bingo_id ' | |
'was %s. Non-strings were allowed in GAE/Bingo, but is no longer ' | |
'allowed in BigBingo, so the event will be ignored.' % | |
(experiment_id, bingo_id)) | |
return experiment.control | |
# If the user set the alternative in a cookie, don't log it as a real | |
# participation event, just return the alternative value. | |
cookie_alternative = _get_cookie_alternative(experiment) | |
if cookie_alternative is not None: | |
logging.info( | |
'bigbingo.ab_test was forced to return alternative "%s" for ' | |
'experiment "%s" (bingo_id "%s") due to a cookie override. No ' | |
'participation event will be logged.' % | |
(cookie_alternative, experiment_id, bingo_id)) | |
return cookie_alternative | |
alternative = experiment.choose_alternative(bingo_id) | |
if alternative is None: | |
# The user should not participate in this experiment, so return the | |
# control and don't log anything. | |
return experiment.control | |
else: | |
log.log_participation_event(bingo_id, experiment.logged_name, | |
alternative) | |
return alternative | |
def get_alternative_for_user(experiment_id, bingo_id): | |
"""Determine which alternative a user would be in. | |
This is similar to ab_test, but doesn't actually enter the user into the | |
experiment. This is mostly useful for "passive" systems that want to get | |
the information without disturbing anything, so this function is also | |
designed to swallow and log exceptions rather than propagating them in most | |
cases. | |
Arguments: | |
experiment_id: An ID of a valid BigBingo experiment. | |
bingo_id: A string with the identity of the current user. | |
Returns: | |
A string with the alternative to return, or None if there was a | |
problem getting it. | |
""" | |
try: | |
experiment = config.get_active_experiment_by_id(experiment_id) | |
except KeyError: | |
logging.error('Tried to get the alternative for an experiment that is ' | |
'not an active experiment: "%s".', experiment_id) | |
return None | |
# Any well-written choose_alternative function should never crash, but | |
# we want to be resistant to crashes in it anyway, which could especially | |
# come up if the bingo_id is accidentally None. | |
try: | |
alternative = experiment.choose_alternative(bingo_id) | |
except Exception: | |
logging.exception( | |
'Error when trying to determine alternative for bingo_id "%s" in ' | |
'BigBingo experiment "%s".' % (bingo_id, experiment_id)) | |
return None | |
return alternative or experiment.control | |
def _get_cookie_alternative(experiment): | |
"""Return any alternative override that the user may have set. | |
Returns either a valid alternative for the experiment or None if there | |
wasn't a valid alternative set. | |
""" | |
alt_index = cookie_util.get_cookie_value('bingo_force_' + experiment.id) | |
if not alt_index: | |
return None | |
try: | |
return experiment.alternatives[int(alt_index)] | |
except (IndexError, ValueError): | |
logging.warn('Cookie overriding experiment %s specified index %s, ' | |
'which is not valid. Ignoring.' % | |
(experiment.id, alt_index)) | |
return None | |
def mark_conversion(conversion_id, bingo_id): | |
"""Indicates that the given user has triggered the given conversion. | |
The BigBingo summarizer will count the conversion event toward all | |
experiments that the given user is currently a part of. | |
""" | |
try: | |
conv = config.get_source_conversion_by_id(conversion_id) | |
except KeyError: | |
conv = (try_conversion_id(conversion_id + '_count') or | |
try_conversion_id(conversion_id + '_binary')) | |
if conv: | |
logging.error( | |
'The BigBingo conversion "%s" is invalid, but we found one to ' | |
'use instead: "%s".' % (conversion_id, conv.id)) | |
else: | |
logging.error("The conversion \"%s\" was used, but is not a valid " | |
"BigBingo conversion. We'll treat the conversion_id " | |
"as if it's a valid logged_name.", conversion_id) | |
log.log_conversion_event(bingo_id, conversion_id) | |
return | |
log.log_conversion_event(bingo_id, conv.id) | |
def try_conversion_id(conversion_id): | |
try: | |
return config.get_source_conversion_by_id(conversion_id) | |
except KeyError: | |
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""This module makes it easy for other modules to connect to BigQuery.""" | |
import json | |
import logging | |
import os | |
import re | |
import time | |
from third_party.google_api_python_client import httplib2 | |
from third_party.google_api_python_client.apiclient import ( | |
discovery as apiclient_discovery, | |
errors as apiclient_errors, | |
) | |
from third_party.google_api_python_client.oauth2client import ( | |
appengine as oauth2client_appengine, | |
client as oauth2client_client, | |
) | |
import ka_globals | |
import ka_root | |
# Hard-coded project ID. | |
BQ_PROJECT_ID = '' | |
# Keywords that BigQuery will refuse to parse as column names. | |
BQ_RESERVED_KEYWORDS = frozenset(["profile"]) | |
class BigQueryError(Exception): | |
pass | |
class BigQueryService(object): | |
"""A wrapper for the BigQuery Python client service. | |
This page enumerates the calls available to the API: | |
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/python/latest/index.html | |
It doesn't describe the interface for making calls. Do so like this: | |
service = BigQueryService.get_service() | |
request = service.tables().list(<ARGS>) | |
response = request.execute() | |
# response is a Python object representation of the JSON response. | |
""" | |
_SERVICE = None | |
@classmethod | |
def get_service(cls): | |
if not cls._SERVICE: | |
# Service creation does an HTTP fetch so we only do it one | |
# per instance and cache the service object. | |
credentials = get_credentials() | |
# Use a timeout higher than the default 5 seconds since this | |
# service is currently only used for background jobs, not | |
# user-facing requests, and batching and BigQuery throttling can | |
# make us exceed that default. | |
http = httplib2.Http(timeout=30) | |
http = credentials.authorize(http) | |
cls._SERVICE = apiclient_discovery.build('bigquery', 'v2', | |
http=http) | |
return cls._SERVICE | |
def get_credentials(): | |
if ka_globals.is_dev_server: | |
return oauth2client_client.AccessTokenCredentials( | |
get_dev_access_token(), 'log2bq/0.1') | |
else: | |
return oauth2client_appengine.AppAssertionCredentials( | |
scope='https://www.googleapis.com/auth/bigquery') | |
def get_dev_access_token(): | |
"""Gets the developer's BigQuery access token for use in a dev environment. | |
You can get a token by first installing the BigQuery command line tool "bq" | |
in a new virtual environment (to avoid conflicts with App Engine): | |
virtualenv ~/.virtualenv/bigquery_env | |
source ~/.virtualenv/bigquery_env/bin/activate | |
pip install bigquery | |
bq ls | |
(If you don't yet have access to BigQuery, you'll need to ask someone for | |
access). The first time you run "bq ls", it will provide instructions for | |
authenticating, then create a JSON file called .bigquery.v2.token in your | |
home directory, which contains the access token as one of its fields. | |
Running "bq ls" again (in the right virtualenv) will refresh that token. | |
The most robust approach is to have a file called bigquery_token in the | |
webapp directory contains the contents of the .bigquery.v2.token file that | |
is created/updated in your home directory when running "bq ls". You can | |
accomplish this using a symlink or by making a shell script to both run "bq | |
ls" and copy the token file to the right place. (See | |
https://gist.github.com/alangpierce/8e01f71b664761cec0a3) (We can't read | |
this file directly since the app engine sandbox disallows reading files | |
outside of webapp and also disallows reading files starting with a dot.) | |
You can also provide the access token more directly by setting the | |
BQ_ACCESS_TOKEN environment variable in app.yaml or by temporarily changing | |
this code to return the access token as a string literal. | |
""" | |
env_token = os.getenv('BQ_ACCESS_TOKEN') | |
if env_token: | |
return env_token | |
token_file_path = ka_root.join('bigquery_token') | |
if not os.path.isfile(token_file_path): | |
raise BigQueryError('You do not have a BigQuery token configured. ' | |
'See get_dev_access_token in bq_connection.py for ' | |
'instructions on how to configure one.') | |
with open(token_file_path, 'r') as token_file: | |
token_config_obj = json.load(token_file) | |
return token_config_obj['access_token'] | |
def ensure_bigquery_dataset_exists(name, description=""): | |
"""Create a new bigquery dataset if it does not already exist. | |
Datasets contain tables. When loading a new table, the | |
dataset must be specified. If the specified dataset does | |
not already exist then the call will fail. | |
Args: | |
bigquery: the BigQuery service interface. | |
name: the unique name of the dataset (alphanumeric or _) | |
description: (optional) string describing the dataset | |
""" | |
# Setup a dataset creation request | |
request = BigQueryService.get_service().datasets().insert( | |
projectId=BQ_PROJECT_ID, body={ | |
"datasetReference": { | |
"datasetId": name, | |
"projectId": BQ_PROJECT_ID, | |
}, | |
"friendlyName": name, | |
"description": description | |
}) | |
# Try to create the dataset | |
# Catch the error that is returned if the dataset already exists | |
# pass forward all other errors | |
try: | |
request.execute() | |
except apiclient_errors.HttpError, e: | |
code = json.loads(e.content)['error']['code'] | |
table_already_exists_code = 409 | |
if code == table_already_exists_code: | |
pass | |
else: | |
raise | |
def ensure_bigquery_table_exists( | |
dataset, table, new_schema, create_if_necessary=False, | |
patch_schema_if_necessary=False, ttl_days=None): | |
"""Makes sure that the given table exists and has the given columns. | |
The schema check does not check for an exact schema match; instead it | |
ensures that all fields in the given schema exist within the given table. | |
By default, an exception is thrown if the given table does not exist or | |
does not match. | |
Arguments: | |
dataset: The name of the dataset of the table to check/create. | |
table: The name (not including the dataset) of the table to | |
check/create. | |
new_schema: A dict containing the schema to use when creating the | |
table, or the schema we expect the table to have if the table | |
already exists. The schema format is specified in the bigquery docs | |
(a link to them is in the BigQueryService docstring) for the | |
bq_service.tables().insert(...) call. For example, here's a schema | |
with a single nullable string, 'foo': | |
{'fields': [{'name': 'foo', 'type': 'STRING', 'mode', 'NULLABLE'}]} | |
Note that all three components of each field should be specified, | |
or else the schema check will fail if the table already exists. | |
create_if_necessary: If the table does not exist, we create it with the | |
given schema rather than throwing an exception. | |
patch_schema_if_necessary: If the table exists but does not contain | |
the given schema, it is patched to include the given schema. Note | |
that BigQuery does not allow columns to be removed with a patch | |
operation, so any previously-existing columns not in new_schema are | |
moved to the end of the table. Also, a failure can still occur here | |
if the schema update is invalid (for example, changing an integer | |
column to a string column). Not all valid patch options are | |
currently supported: in particular, the operation will fail if the | |
mode of a column is relaxed or if a column is added to a nested | |
record, even though both types of operations are supported by | |
BigQuery. | |
ttl_days: If present and this operation creates a new table, the table | |
is created with an expiration time that makes it expire after this | |
many days. | |
""" | |
old_schema = get_table_schema(dataset, table) | |
if not old_schema: | |
# Table does not exist. | |
if create_if_necessary: | |
create_empty_table(dataset, table, new_schema, ttl_days) | |
return | |
else: | |
raise BigQueryError('Expected table %s.%s to exist.' % | |
(dataset, table)) | |
# Table exists, so validate the schema. | |
if schema_is_subset(new_schema, old_schema): | |
return | |
# Schema mismatch, so error or fix. | |
if patch_schema_if_necessary: | |
patch_schema_to_table(dataset, table, old_schema, new_schema) | |
else: | |
raise BigQueryError( | |
'Schema mismatch for table %s.%s. Expected schema %s to ' | |
'contain %s.' % (dataset, table, old_schema, new_schema)) | |
def schema_is_subset(subset_schema, full_schema): | |
"""Returns true if the given schema is a subset of the full schema.""" | |
full_schema_fields_by_name = { | |
field['name']: field for field in full_schema['fields']} | |
for subset_field in subset_schema['fields']: | |
full_field = full_schema_fields_by_name.get(subset_field['name']) | |
if full_field is None or subset_field != full_field: | |
return False | |
return True | |
def patch_schema_to_table(dataset, table, old_schema, patch_schema): | |
"""Patch the given table to include the given schema. | |
If the patch_schema removes any fields from old_schema, the old fields are | |
kept and put at the end of the schema. | |
Arguments: | |
dataset: The dataset of the table to patch. | |
table: The table to patch. | |
old_schema: The current schema of the table to patch. | |
patch_schema: A schema containing the new fields. | |
""" | |
BigQueryService.get_service().tables().patch( | |
projectId=BQ_PROJECT_ID, | |
datasetId=dataset, | |
tableId=table, | |
body={'schema': get_schema_for_patch(old_schema, patch_schema)} | |
).execute() | |
def get_schema_for_patch(old_schema, patch_schema): | |
"""Returns the schema to send to patch patch_schema onto old_schema.""" | |
patch_schema_names = {field['name'] for field in patch_schema['fields']} | |
new_schema_fields = list(patch_schema['fields']) | |
for old_field in old_schema['fields']: | |
if old_field['name'] not in patch_schema_names: | |
new_schema_fields.append(old_field) | |
return { | |
'fields': new_schema_fields | |
} | |
def get_table_schema(dataset, table): | |
"""If the table exists, returns its schema. Otherwise, returns None.""" | |
table_service = BigQueryService.get_service().tables() | |
try: | |
get_result = table_service.get( | |
projectId=BQ_PROJECT_ID, | |
datasetId=dataset, | |
tableId=table | |
).execute() | |
return get_result['schema'] | |
except apiclient_errors.HttpError as e: | |
# Return None if the table doesn't exist. | |
if json.loads(e.content)['error']['code'] == 404: | |
return None | |
else: | |
raise | |
def get_leaf_column_selectors(dataset, table): | |
"""Parses the table's schema to generate a list of column selectors. | |
BigQuery tables may have record fields, which have a hierarchical schema. | |
Each subfield is delimited with a dot, but BigQuery views cannot have dots | |
in their names. So, instead of defining the view like: | |
SELECT | |
__key__.namespace, | |
__key__.path | |
FROM | |
[MyTable] | |
You have to define it like: | |
SELECT | |
__key__.namespace as __key___namespace, | |
__key__.path as __key___path | |
FROM | |
[MyTable] | |
For more information, see http://stackoverflow.com/questions/23840038 | |
""" | |
schema = get_table_schema(dataset, table) | |
if not schema: | |
raise BigQueryError('Expected table %s.%s to exist.' % ( | |
dataset, table)) | |
return ",\n".join([ | |
_get_leaf_selectors("", top_field) | |
for top_field in schema["fields"] | |
]) | |
def _get_leaf_selectors(prefix, field, depth=0): | |
"""Recursive helper for get_leaf_column_selectors()""" | |
field_name = field["name"] | |
if prefix: | |
field_name = prefix + "." + field_name | |
if 'fields' not in field: | |
if "." in field_name: | |
# If we translate user.email to user_email, the user_email field | |
# may already exist as a top-level field, so prepend an underscore | |
# to signify this is a record-turned-regular field. There shouldn't | |
# be any top-level actual fields that start with an underscore. | |
safe_name = field_name.replace(".", "_") | |
return "%s as _%s" % (field_name, safe_name) | |
elif depth == 0 and field_name.lower() in BQ_RESERVED_KEYWORDS: | |
# Reserved keywords mess up BigQuery's SQL parsing, so make sure we | |
# don't use the bare keyword as the column name | |
return "[%s]" % field_name | |
else: | |
return field_name | |
else: | |
# Recursive case | |
return ",\n".join([ | |
_get_leaf_selectors(field_name, sub_field, depth + 1) | |
for sub_field in field["fields"] | |
]) | |
def delete_table_if_exists(dataset, table): | |
"""If the table exists, delete it. Otherwise, do nothing.""" | |
table_service = BigQueryService.get_service().tables() | |
try: | |
table_service.delete( | |
projectId=BQ_PROJECT_ID, | |
datasetId=dataset, | |
tableId=table | |
).execute() | |
except apiclient_errors.HttpError as e: | |
# Return (do nothing) if the table doesn't exist. | |
if json.loads(e.content)['error']['code'] == 404: | |
return | |
else: | |
raise | |
def create_empty_table(dataset, table, schema, ttl_days=None): | |
"""Create the given table | |
An exception will be thrown if the table already exists. | |
Arguments: | |
dataset: The name of the dataset of the table to check/create. | |
table: The name (not including the dataset) of the table to | |
check/create. | |
schema: A dict describing the schema of the new table. See the | |
docstring on ensure_bigquery_table_exists for a description of the | |
dict format. | |
ttl_days: If present, the table is set to expire after this many days. | |
""" | |
table_service = BigQueryService.get_service().tables() | |
table_service.insert( | |
projectId=BQ_PROJECT_ID, | |
datasetId=dataset, | |
body={ | |
'tableReference': { | |
'projectId': BQ_PROJECT_ID, | |
'datasetId': dataset, | |
'tableId': table | |
}, | |
'schema': schema, | |
} | |
).execute() | |
# We could technically set the expiration time in the regular request that | |
# creates the table, but setting it after-the-fact makes the code simpler. | |
if ttl_days is not None: | |
set_table_ttl_in_days(dataset, table, ttl_days) | |
def create_or_update_view(dataset, view_name, query): | |
"""Create a BigQuery view from from the given query. | |
If the table already exists, it is changed to a view (if necessary) and the | |
query for the view is updated to the given query. | |
Arguments: | |
dataset: The dataset where the view will go. | |
view_name: The name of the view to create or update. | |
query: A string with the SQL for the view. | |
""" | |
table_service = BigQueryService.get_service().tables() | |
table_body = { | |
'tableReference': { | |
'projectId': BQ_PROJECT_ID, | |
'datasetId': dataset, | |
'tableId': view_name | |
}, | |
'view': { | |
'query': query | |
} | |
} | |
try: | |
table_service.insert( | |
projectId=BQ_PROJECT_ID, | |
datasetId=dataset, | |
body=table_body).execute() | |
except apiclient_errors.HttpError as e: | |
if json.loads(e.content)['error']['code'] == 409: # Already exists | |
table_service.update( | |
projectId=BQ_PROJECT_ID, | |
datasetId=dataset, | |
tableId=view_name, | |
body=table_body).execute() | |
else: | |
logging.error("Failed with query: %s", query) | |
raise | |
def set_table_ttl_in_days(dataset, table, num_days): | |
"""Set the table to expire after the given number of days. | |
Tables last forever by default, but this sets a time-to-live so that | |
BigQuery will later delete the table. This function relies on the system | |
clock to compute an expiration time. | |
Arguments: | |
dataset: The dataset containing the table to modify. | |
table: The name of the table to modify. | |
num_days: A number (not necessarily an integer) with the number of days | |
to keep the table around. | |
""" | |
assert num_days > 0 | |
current_time_ms = time.time() * 1000 | |
time_to_live_ms = num_days * 24 * 60 * 60 * 1000 | |
expiration_time_ms = int(current_time_ms + time_to_live_ms) | |
BigQueryService.get_service().tables().patch( | |
projectId=BQ_PROJECT_ID, | |
datasetId=dataset, | |
tableId=table, | |
body={'expirationTime': expiration_time_ms} | |
).execute() | |
def _get_all_pages(query_function, parameters, key_to_merge): | |
"""Pages through results returned by the BigQuery API | |
Arguments: | |
query_function - the function to call, such as datasets().list | |
parameters - the initial parameters to send to the function. This will | |
be augmented with the page tokens if there are multiple pages. | |
key_to_merge - the key within the dictionary of results that contains | |
an array of objects to merge together to form the final list. | |
Returns: | |
A really long list in the format of the original API call. | |
""" | |
def get_list(the_result): | |
# Sometimes the query is valid, but returns nothing. In this case the | |
# key is not present, so we should just return an empty list. | |
if key_to_merge not in the_result: | |
return [] | |
else: | |
return the_result[key_to_merge] | |
result = query_function(**parameters).execute() | |
final_result = { | |
key_to_merge: get_list(result) | |
} | |
while "nextPageToken" in result: | |
parameters["pageToken"] = result["nextPageToken"] | |
result = query_function(**parameters).execute() | |
final_result[key_to_merge].extend(get_list(result)) | |
return final_result | |
def get_all_backup_dataset_names(): | |
"""Return the names of all datastore backup datasets. | |
https://developers.google.com/bigquery/docs/reference/v2/datasets/list | |
""" | |
backup_dataset_names = [] | |
parameters = { | |
"projectId": BQ_PROJECT_ID | |
} | |
datasets_list = _get_all_pages( | |
BigQueryService.get_service().datasets().list, parameters, "datasets") | |
for d in datasets_list['datasets']: | |
dataset_name = d['datasetReference']['datasetId'] | |
# Must match YYYY_MM_DD format | |
if re.search('(^[0-9]{4}_[0-9]{2}_[0-9]{2}$)', dataset_name): | |
backup_dataset_names.append(dataset_name) | |
return backup_dataset_names | |
def get_all_tables_in_dataset(dataset): | |
"""Return the names of all a dataset's tables, or None for a bad dataset. | |
Note that there is currently no way to disambiguate between a view and a | |
table, so both are returned. | |
https://developers.google.com/bigquery/docs/reference/v2/tables/list | |
""" | |
parameters = { | |
"projectId": BQ_PROJECT_ID, | |
"datasetId": dataset, | |
"maxResults": 500, | |
} | |
try: | |
tables_list = _get_all_pages( | |
BigQueryService.get_service().tables().list, parameters, "tables") | |
return [table['tableReference']['tableId'] | |
for table in tables_list['tables']] | |
except apiclient_errors.HttpError as e: | |
# Return None if the dataset doesn't exist. | |
if json.loads(e.content)['error']['code'] == 404: | |
return None | |
else: | |
raise | |
def get_most_recent_backup_dataset(): | |
"""Return the name of the most recent datastore backup dataset.""" | |
backup_names = get_all_backup_dataset_names() | |
if backup_names: | |
return sorted(backup_names)[-1] | |
else: | |
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Useful pipeline classes for interacting with BigQuery.""" | |
import json | |
import logging | |
import os | |
import time | |
from third_party.google_api_python_client.apiclient import ( | |
errors as apiclient_errors, | |
http as apiclient_http | |
) | |
from third_party.mapreduce.lib.pipeline import pipeline | |
from third_party.mapreduce.lib.pipeline import common | |
from bigquery import bq_connection | |
# In some situations, we want to submit lots of API requests at once. Batching | |
# helps with this, but BigQuery will give an error if we send way too many API | |
# requests in the same batch, so we limit the number of requests we're willing | |
# to send at once. | |
DEFAULT_NUM_REQUESTS_PER_BATCH = 10 | |
# When hitting retriable errors (rate limit exceeded or internal BigQuery | |
# errors) when submitting HTTP requests, automatically retry up to this many | |
# times. | |
# | |
# Note that automatic pipeline retry accomplishes some of this, but this HTTP | |
# retry works better because only failed requests in a batch are retried rather | |
# than retrying the whole batch. Also, since we limit retrying to definitely- | |
# retriable errors, it's ok to be more aggressive about retrying. | |
NUM_HTTP_RETRY_ATTEMPTS = 10 | |
# Sets a limit on the amount of time we're willing to wait before giving up on | |
# a query. This setting is necessary because sometimes, BigQuery will spend a | |
# very long time on a small number of queries. For example, there have been | |
# times where 50 queries are submitted at the same time and 48 of those queries | |
# finish within a few minutes, but the remaining two take multiple hours to | |
# finish. In these cases, simply submitting another copy of the two stalled | |
# queries would work, so we treat the time limit as a retriable error. | |
# TODO(alan): This time limit used to be 15 minutes, but I disabled the feature | |
# by setting it to None to work around a correctness issue in BigQuery that | |
# happens when you submit the same query twice: | |
# http://stackoverflow.com/questions/23424289/what-atomicity-guarantees-does-bigquery-provide-for-query-jobs | |
# If we keep running into issues, it may make sense to set this time limit | |
# again (potentially to a number higher than 15 minutes), and if we stop | |
# running into the issue, we may want to remove this feature completely. | |
QUERY_TIME_LIMIT_SECONDS = None | |
# Initial amount of time, in seconds, to sleep when waiting for a job to finish | |
# (assuming it's not done immediately). | |
JOB_WAIT_INITIAL_TIME = 5 | |
# Increase the sleeping time by the following factor when waiting for a job to | |
# finish. Back off exponentially so the task hierarchy doesn't get ridiculously | |
# deep in the pipeline viewer when we wait on a long-running job like a batch | |
# query. | |
JOB_WAIT_BACKOFF_MULTIPLIER = 1.5 | |
DEFAULT_QUERY_PRIORITY = os.environ.get('DEFAULT_BIGQUERY_PRIORITY', 'BATCH') | |
class ReturnOutputs(pipeline.Pipeline): | |
"""General-purpose mechanism for returning named outputs. | |
This makes it more convenient to return pipeline results as outputs, since | |
the normal self.fill mechanism requires returning already-resolved values, | |
not pipeline futures. To use this, this pipeline should be yielded last. | |
Also, any outputs specified will be ignored unless they are specified in | |
the output_names list of the calling pipeline. | |
For example, this code snippet lets you combine the results from two child | |
pipelines into three outputs (one default and two named): | |
default_and_foo = yield ComputeDefaultAndFoo() | |
bar = yield ComputeBar() | |
yield ReturnOutputs(default_and_foo, foo=default_and_foo.foo, bar=bar) | |
Note that this pipeline is unnecessary if the last pipeline yielded already | |
returns outputs with the desired names and values. | |
""" | |
def run(self, default_output, **kwargs): | |
for key in kwargs: | |
self.fill(key, kwargs[key]) | |
return default_output | |
class ListIndex(pipeline.Pipeline): | |
"""Returns the element at the given index from the given list.""" | |
def run(self, input_list, index): | |
return input_list[index] | |
class SmallResultQueryPipeline(pipeline.Pipeline): | |
"""Pipeline for running a query and getting its results. | |
This pipeline is convenient if the results are small (e.g. if you're | |
querying an aggregate) and you want to view the results directly in Python | |
rather than sending the results to a table. | |
Arguments: | |
query: A string with the text of the query to run. | |
Returns: | |
The query results as returned in the getQueryResults method in the jobs | |
section of the BigQuery API. For example, | |
results['rows'][0]['f'][0]['v'] gets the first field of the first row. | |
""" | |
def run(self, query): | |
job_metadata_list = yield RunJobBatchPipeline([{ | |
'query': { | |
'query': query | |
} | |
}]) | |
job_metadata = yield ListIndex(job_metadata_list, 0) | |
yield GetQueryResultsPipeline(job_metadata) | |
class GetQueryResultsPipeline(pipeline.Pipeline): | |
"""Fetches and returns the actual query results for a job. | |
Arguments: | |
job_metadata: A dict containing information about the job to get query | |
results for, as returned by insert or get in the jobs section of | |
the BigQuery API. | |
Returns: | |
The query results as returned in the getQueryResults method in the jobs | |
section of the BigQuery API. For example, | |
results['rows'][0]['f'][0]['v'] gets the first field of the first row. | |
""" | |
def run(self, job_metadata): | |
job_service = bq_connection.BigQueryService.get_service().jobs() | |
return job_service.getQueryResults( | |
projectId=job_metadata['jobReference']['projectId'], | |
jobId=job_metadata['jobReference']['jobId']).execute() | |
class QueryToTablePipeline(pipeline.Pipeline): | |
"""Pipeline for running a query and sending the output to a table. | |
This pipeline will submit a query and will not mark itself as done until | |
the query has either finished or failed. | |
Arguments: | |
query: A string with the text of the query to run. | |
output_dataset: The dataset of the output table. | |
output_table: The name of the output table. | |
create_if_needed: If true (the default), we create the table (rather | |
than failing) if it doesn't already exist. | |
append_to_table: If false (the default), we overwrite the table | |
contents rather than appending to the end of the table. | |
priority: A string, either 'INTERACTIVE' or 'BATCH'. In both cases, the | |
query will be submitted asynchronously and the pipeline will wait | |
until the query completes, but interactive queries generally run | |
faster and are subject to rate limits, while batch queries are | |
marked as lower priority in BigQuery's scheduler and have no | |
restriction on the number of concurrent queries. The default | |
priority is BATCH since pipelines are most commonly run in | |
background jobs where stability and isolation (i.e. not affecting | |
humans using the BigQuery webapp) are more important than low | |
latency. | |
allow_large_results: Setting to True (the default) may slow the query | |
down, but allows the query to return more than 128MB of results. | |
result_table_ttl_days: If specified, the resulting table is | |
asynchronously set to expire after the given number of days. | |
table_desc: A string with a human-readable description of this BigQuery | |
table. If this is None, we do not set the table description. | |
schema_desc: A list of dictionaries, where each dictionary is of the | |
form as described in [1] in the patch/body/schema/fields section. | |
If this is None, we do not set the schema descriptions. | |
Returns: | |
default: Metadata describing the status of the query job (the return | |
value of the "get" function in the "jobs" section of the BigQuery | |
API). | |
cost: The number of cents spent on the query. | |
[1] https://developers.google.com/resources/api-libraries/documentation/ | |
bigquery/v2/python/latest/bigquery_v2.tables.html | |
""" | |
output_names = ['cost'] | |
def run(self, query, output_dataset, output_table, create_if_needed=True, | |
append_to_table=False, priority=DEFAULT_QUERY_PRIORITY, | |
allow_large_results=True, result_table_ttl_days=None, | |
table_desc=None, schema_desc=None): | |
job_results = yield RunJobBatchPipeline( | |
[query_to_table_job_config(query, output_dataset, output_table, | |
create_if_needed, append_to_table, | |
priority, allow_large_results)], | |
time_limit=QUERY_TIME_LIMIT_SECONDS) | |
if result_table_ttl_days is not None: | |
with pipeline.After(job_results): | |
yield SetTableTTLInDaysPipeline( | |
output_dataset, output_table, result_table_ttl_days) | |
if table_desc is not None or schema_desc is not None: | |
with pipeline.After(job_results): | |
yield UpdateTableInfoPipeline( | |
output_dataset, output_table, table_desc, schema_desc) | |
cost = yield GetCostPipeline(job_results) | |
job_result = yield ListIndex(job_results, 0) | |
yield ReturnOutputs(job_result, cost=cost) | |
class QueryToTableBatchPipeline(pipeline.Pipeline): | |
"""Batched version of QueryToTablePipeline. | |
This pipeline lets you submit many queries at the same time and block until | |
all of them have finished. The API requests are batched, so this version is | |
more performant, and the requests are automatically throttled as necessary | |
to avoid hitting BigQuery's request limit. | |
Arguments: | |
query_specs: A list of dictionaries, each of which describes a query | |
to make and the table that it should output to. The dictionary keys | |
are the same as the arguments to QueryToTablePipeline, although | |
result_table_ttl_days is not yet supported for this pipeline. | |
Returns: | |
default: A list of job results, each of which has the same format as | |
the return value of bq_service.jobs().get(...). | |
cost: The combined cost of all queries. | |
""" | |
output_names = ['cost'] | |
def run(self, query_specs): | |
job_configs = [query_to_table_job_config(**spec) | |
for spec in query_specs] | |
results = yield RunJobBatchPipeline( | |
job_configs, time_limit=QUERY_TIME_LIMIT_SECONDS) | |
cost = yield GetCostPipeline(results) | |
yield ReturnOutputs(results, cost=cost) | |
def query_to_table_job_config( | |
query, output_dataset, output_table, create_if_needed=True, | |
append_to_table=False, priority=DEFAULT_QUERY_PRIORITY, | |
allow_large_results=True): | |
"""Gets the 'configuration' dict to use when starting a BigQuery query. | |
See the documentation for bq_service.jobs().insert(...) for more | |
information. | |
""" | |
logging.info('Submitting query %s' % query) | |
create_mode = 'CREATE_IF_NEEDED' if create_if_needed else 'CREATE_NEVER' | |
# Note that the API also allows for a WRITE_EMPTY option where we | |
# expect the table to be empty, and there's no reason we can't add | |
# support for that here if we need to. | |
write_mode = 'WRITE_APPEND' if append_to_table else 'WRITE_TRUNCATE' | |
return { | |
'query': { | |
'query': query, | |
'priority': priority, | |
'destinationTable': { | |
'projectId': bq_connection.BQ_PROJECT_ID, | |
'datasetId': output_dataset, | |
'tableId': output_table | |
}, | |
'createDisposition': create_mode, | |
'writeDisposition': write_mode, | |
'allowLargeResults': allow_large_results, | |
} | |
} | |
class GetCostPipeline(pipeline.Pipeline): | |
"""Given a list of query metadata, compute the total cost of all queries. | |
Arguments: | |
job_infos: A list of dicts, each containing information about a query | |
job. The format is the return format for bq_service.jobs().get(...), as | |
documented in the BigQuery API (see the documentation on | |
BigQueryService for a link). | |
Returns: The combined cost of all queries, in cents. | |
""" | |
def run(self, job_infos): | |
result = 0 | |
for job_info in job_infos: | |
num_bytes = int(job_info['statistics']['query'] | |
['totalBytesProcessed']) | |
# Queries cost 0.5 cents per GB of data touched. | |
result += 0.5 * num_bytes / (2 ** 30) | |
return result | |
class CopyTablePipeline(pipeline.Pipeline): | |
"""Copies the contents of the source table to the destination table. | |
Both tables are specified by their dataset name and table name. If | |
append_to_table is False and the destination table already exists, it is | |
overwritten with the contents of the source table. | |
This pipeline is not marked as finished until the copy job is complete. | |
""" | |
def run(self, src_dataset, src_table, dest_dataset, dest_table, | |
append_to_table=False, result_table_ttl_days=None): | |
result_list = yield RunJobBatchPipeline([copy_table_job_config( | |
src_dataset, src_table, dest_dataset, dest_table, append_to_table) | |
]) | |
if result_table_ttl_days is not None: | |
with pipeline.After(result_list): | |
yield SetTableTTLInDaysPipeline( | |
dest_dataset, dest_table, result_table_ttl_days) | |
yield ListIndex(result_list, 0) | |
class CopyTableBatchPipeline(pipeline.Pipeline): | |
"""Performs multiple copy operations in a batch. | |
Arguments: | |
copy_specs: A list of dictionaries, each of which describes a copy | |
operation. The dict keys should be the same as the arguments to | |
CopyTablePipeline. | |
""" | |
def run(self, copy_specs): | |
yield RunJobBatchPipeline( | |
[copy_table_job_config(**copy_spec) for copy_spec in copy_specs]) | |
def copy_table_job_config(src_dataset, src_table, dest_dataset, dest_table, | |
append_to_table=False): | |
write_mode = 'WRITE_APPEND' if append_to_table else 'WRITE_TRUNCATE' | |
return { | |
'copy': { | |
'sourceTable': { | |
'projectId': bq_connection.BQ_PROJECT_ID, | |
'datasetId': src_dataset, | |
'tableId': src_table | |
}, | |
'destinationTable': { | |
'projectId': bq_connection.BQ_PROJECT_ID, | |
'datasetId': dest_dataset, | |
'tableId': dest_table | |
}, | |
'createDisposition': 'CREATE_IF_NEEDED', | |
'writeDisposition': write_mode | |
} | |
} | |
class UpdateTableInfoPipeline(pipeline.Pipeline): | |
"""Updates the descriptions of an existing BigQuery table. | |
This pipeline assumes that the table already exists and the schema are as | |
specified. The entire purpose of this class as is is to add a description | |
for the table itself and/or for the schema in the table. | |
For an example of how to include descriptions when creating a table, please | |
refer to bigquery_reports/user_sessions.py. | |
TODO(ilan): It would be ideal to automatically generate the schema for our | |
existing tables based on the docstrings we've already written. | |
""" | |
def run(self, dataset_name, table_name, table_desc=None, schema_desc=None): | |
"""Updates the descriptions for the BigQuery table and its schema. | |
Args: | |
dataset_name: A string with the ID of the dataset where this table | |
resides. | |
table_name: A string with the ID of this table. | |
table_desc: A string with a human-readable description of this | |
BigQuery table. If this is None, we do not set the | |
table description. | |
schema_desc: A list of dictionaries, where each dictionary is of | |
the form as described in [1] in the patch/body/ | |
schema/fields section. If this is None, we do not set | |
the schema descriptions. | |
[1] https://developers.google.com/resources/api-libraries/ | |
documentation/bigquery/v2/python/latest/bigquery_v2.tables.html | |
""" | |
table_service = bq_connection.BigQueryService.get_service().tables() | |
# I'm really not sure what to put here, so I'm just copying from | |
# EnsureTablesDeletedPipeline for now | |
def handle_result(request_id, response, exception): | |
# Ignore missing tables | |
if exception and not ( | |
isinstance(exception, apiclient_errors.HttpError) and | |
json.loads(exception.content)['error']['code'] == 404): | |
raise exception | |
run_http_requests( | |
[table_service.patch( | |
projectId=bq_connection.BQ_PROJECT_ID, | |
datasetId=dataset_name, | |
tableId=table_name, | |
body=patch_table_job_config(dataset_name, table_name, | |
table_desc, schema_desc))], | |
handle_result) | |
def patch_table_job_config(dataset_name, table_name, table_desc=None, | |
schema_desc=None): | |
"""Returns the job config dictionary for this table patch method call. | |
Args: | |
dataset_name: A string with the ID of the dataset where this table | |
resides. | |
table_name: A string with the ID of this table. | |
table_desc: A string with a human-readable description of this | |
BigQuery table. If this is None, we do not set the | |
table description. | |
schema_desc: A list of dictionaries, where each dictionary is of | |
the form as described in [1] in the patch/body/ | |
schema/fields section. If this is None, we do not set | |
the schema descriptions. | |
[1] https://developers.google.com/resources/api-libraries/ | |
documentation/bigquery/v2/python/latest/bigquery_v2.tables.html | |
Return: | |
A dictionary of the form required to run the patch job. | |
""" | |
result = { | |
'tableReference': { | |
'projectId': bq_connection.BQ_PROJECT_ID, | |
'tableId': table_name, | |
'datasetId': dataset_name, | |
}, | |
} | |
if table_desc is not None: | |
result['description'] = table_desc | |
if schema_desc is not None: | |
result['schema'] = { | |
'fields': schema_desc, | |
} | |
return result | |
class LoadTablePipeline(pipeline.Pipeline): | |
"""Loads a table from Cloud Storage. | |
If the specified table does not already exist, it is created. This pipeline | |
is not marked as finished until the load job is complete, and the pipeline | |
is aborted if there is a job failure. | |
Arguments: | |
cloud_storage_paths: A list of strings specifying the cloud storage | |
files to load, e.g. "gs://foo/bar". | |
dataset: The dataset of the table to write to. | |
table: The name of the table to write to. | |
schema: The schema of the table. If the table already exists, this must | |
be a subset of the existing schema. See the documentation for | |
bq_service.jobs().insert(...) (the BigQueryService documentation | |
has a link) for a description of the schema format. | |
source_format: A string, either NEWLINE_DELIMITED_JSON or CSV. | |
write_mode: A string, either WRITE_APPEND or WRITE_TRUNCATE. | |
result_table_ttl_days: If specified, the resulting table is | |
asynchronously set to expire after the given number of days. | |
Returns: | |
Metadata about the job result, in the format of the return value of | |
bq_service.jobs().get(...). | |
""" | |
def run(self, cloud_storage_paths, dataset, table, schema, source_format, | |
write_mode, result_table_ttl_days=None): | |
result_list = yield RunJobBatchPipeline([{ | |
'load': { | |
'sourceFormat': source_format, | |
'sourceUris': cloud_storage_paths, | |
'schema': schema, | |
'destinationTable': { | |
'projectId': bq_connection.BQ_PROJECT_ID, | |
'datasetId': dataset, | |
'tableId': table, | |
}, | |
'createDisposition': 'CREATE_IF_NEEDED', | |
'writeDisposition': write_mode, | |
'encoding': 'UTF-8', | |
} | |
}]) | |
if result_table_ttl_days is not None: | |
with pipeline.After(result_list): | |
yield SetTableTTLInDaysPipeline( | |
dataset, table, result_table_ttl_days) | |
yield ListIndex(result_list, 0) | |
class EnsureTablesDeletedPipeline(pipeline.Pipeline): | |
"""Deletes the tables with the given names in the given dataset. | |
Any tables that already don't exist are ignored. The delete operation | |
happens synchronously, so the tables will be deleted when the pipeline is | |
marked as finished. | |
""" | |
def run(self, dataset_name, table_names): | |
table_service = bq_connection.BigQueryService.get_service().tables() | |
def handle_result(request_id, response, exception): | |
# Ignore missing tables | |
if exception and not ( | |
isinstance(exception, apiclient_errors.HttpError) and | |
json.loads(exception.content)['error']['code'] == 404): | |
raise exception | |
run_http_requests( | |
[table_service.delete( | |
projectId=bq_connection.BQ_PROJECT_ID, | |
datasetId=dataset_name, | |
tableId=table_name) | |
for table_name in table_names], | |
handle_result) | |
class SetTableTTLInDaysPipeline(pipeline.Pipeline): | |
"""Set the table to expire after the given number of days. | |
See bq_connection.set_table_ttl_in_days for more info. | |
""" | |
def run(self, dataset, table, num_days): | |
bq_connection.set_table_ttl_in_days(dataset, table, num_days) | |
def should_retry_job(job_info): | |
"""Return True if the given job failed and is eligible for retry. | |
Jobs should be retried if they failed due to BigQuery internal errors. | |
BigQuery job statuses provide a 'reason' error code that helps distinguish | |
between transient BigQuery errors and bugs in our code or other errors that | |
are not worth retrying. | |
Arguments: | |
job_info: The result type of bq_service.jobs().get(...), which provides | |
information about the status and other details of a job. | |
""" | |
return ('errorResult' in job_info['status'] and | |
job_info['status']['errorResult']['reason'] in | |
('internalError', 'backendError', 'quotaExceeded', | |
'ka:timeLimitExceeded')) | |
class RunJobBatchPipeline(pipeline.Pipeline): | |
"""Pipeline to run a list of BigQuery job and block on their completion. | |
Arguments: | |
configs: A list of job configurations. The format is defined in the | |
"configuration" section of the documentation for | |
bq_service.jobs().insert(...). | |
time_limit: The number of seconds we're willing to wait before | |
considering a job stalled and trying again. Or None if we should | |
wait indefinitely. If specified, the jobs passed in should all be | |
idempotent. | |
retries_remaining: The number of times we're willing to retry after | |
this time. This is mostly used internally. | |
Returns: | |
A list of metadata dicts, each of which has the format of the return | |
value of bq_service.jobs().get(...). | |
""" | |
def run(self, configs, time_limit=None, retries_remaining=2): | |
job_service = bq_connection.BigQueryService.get_service().jobs() | |
job_ids = [None] * len(configs) | |
def handler(index, response): | |
job_ids[index] = response['jobReference']['jobId'] | |
run_http_job_requests( | |
[job_service.insert( | |
projectId=bq_connection.BQ_PROJECT_ID, | |
body={ | |
'projectId': bq_connection.BQ_PROJECT_ID, | |
'configuration': config | |
}) | |
for config in configs], | |
handler, | |
# Setting this to be higher than 1 causes things to break. Looks | |
# like a BigQuery bug. | |
num_requests_per_batch=1) | |
job_results = yield WaitForJobBatchPipeline( | |
job_ids, wait_time_seconds=JOB_WAIT_INITIAL_TIME, | |
time_remaining_seconds=time_limit) | |
yield ConsiderJobRetry(job_results, time_limit, retries_remaining) | |
class ConsiderJobRetry(pipeline.Pipeline): | |
"""If any jobs failed and should be retried, retry them. | |
If any jobs actually are retried successfully, we patch the given | |
job_results list so that the new successful results replace the old failed | |
results. | |
TODO(alan): In the quotaExceeded case, we can be smarter about our response | |
rather than just blindly retrying with a cap on the number of times. I | |
think these semantics make the most sense: | |
-If we're not making progress (every job had quotaExceeded), add in a delay | |
of 30 seconds or so before retrying to let the system calm down. | |
-If we're making progress (some jobs were successful, while others had | |
quotaExceeded), don't decrement the retry count (or maybe reset it) and | |
retry immediately. | |
This should make it possible to throw hundreds of queries at | |
RunJobBatchPipeline and have BigQuery process them as fast as it can | |
without ever failing, while still protecting against the case where an | |
individual query fails due to too many queries from other jobs. | |
Arguments: | |
job_results: A list of job result dicts (the return value of | |
bq_service.jobs().get(...)). | |
time_limit: The job time limit to send to RunJobBatchPipeline. | |
retries_remaining: The number of times we're willing to retry. | |
Returns: | |
The job_results list, which may be updated with newly-retried job info. | |
""" | |
def run(self, job_results, time_limit, retries_remaining): | |
# List of (index, config) pairs. We need to track the original indexes | |
# so we can replace the right ones when we finish retrying. | |
retry_configs = [] | |
for index, job_result in enumerate(job_results): | |
if should_retry_job(job_result): | |
logging.info('The following failure occurred, and will be ' | |
'considered for retry: %s' % job_result) | |
retry_configs.append((index, job_result['configuration'])) | |
if retry_configs: | |
if retries_remaining == 0: | |
raise bq_connection.BigQueryError( | |
'The following jobs failed and were retried too many ' | |
'times: %s' % [config for _, config in retry_configs]) | |
new_job_results = yield RunJobBatchPipeline( | |
[config for _, config in retry_configs], time_limit, | |
retries_remaining - 1) | |
yield UpdateJobResultsList(job_results, new_job_results, | |
[index for index, _ in retry_configs]) | |
else: | |
yield common.Return(job_results) | |
class UpdateJobResultsList(pipeline.Pipeline): | |
"""Update some entries in a list of job results with new results. | |
This is used after retrying some failed jobs to combine their new | |
successful result values with the values of the jobs that succeeded | |
originally. | |
Arguments: | |
original_job_results: A list of job results for the entire batch, some | |
of which need to be replaced. | |
new_job_results: A list of job results to replace the original results. | |
replacement_indices: A list of indices. The nth index is the place in | |
original_job_results to put new_job_results[n]. | |
Returns: | |
A list based on original_job_results, but with some replacements made, | |
as described by new_job_results and replacement_indices. | |
""" | |
def run(self, original_job_results, new_job_results, replacement_indices): | |
for new_job_result, index in zip(new_job_results, replacement_indices): | |
original_job_results[index] = new_job_result | |
return original_job_results | |
class WaitForJobBatchPipeline(pipeline.Pipeline): | |
"""Pipeline which waits for a list of BigQuery jobs to finish. | |
Arguments: | |
job_ids: A list of BigQuery job IDs to wait on. | |
wait_time_seconds: The initial amount of time to wait for the job to | |
finish. This is used to implement exponential backoff (the pipeline | |
will always wait indefinitely for the jobs to finish). | |
time_remaining_seconds: Either an integer number of seconds or None. If | |
not None, this pipeline will stop waiting after that many seconds | |
have passed and any jobs that are still not done will have their | |
statuses modified so that they're marked as DONE with a | |
timeLimitExceeded error (we use the same error format that BigQuery | |
uses since error-handling code expects that format). | |
Returns: | |
A list of metadata dicts describing the jobs' results, as returned by | |
bq_service.jobs().get(...). See the docstring on BigQueryService for a | |
link to the docs. Since we retry until the jobs are finished, this | |
metadata will always show the jobs as finished. | |
""" | |
def run(self, job_ids, wait_time_seconds, time_remaining_seconds): | |
job_service = bq_connection.BigQueryService.get_service().jobs() | |
# This list starts out with all Nones and gets filled in. | |
completed_results = [None] * len(job_ids) | |
remaining_job_results = [] | |
remaining_job_indices = [] | |
def handle_result(index, response): | |
if response['status']['state'] in ('PENDING', 'RUNNING'): | |
remaining_job_results.append(response) | |
remaining_job_indices.append(index) | |
else: | |
completed_results[index] = response | |
run_http_job_requests( | |
[job_service.get(projectId=bq_connection.BQ_PROJECT_ID, | |
jobId=job_id) | |
for job_id in job_ids], | |
handle_result) | |
# Compute other_results, an ordered list of job results for jobs that | |
# we wait for recursively. | |
if remaining_job_results: | |
if (time_remaining_seconds is not None and | |
time_remaining_seconds <= 0): | |
# If we're out of time, pretend the job finished with an error | |
# so we retry it. In practice, these long-running jobs can go | |
# for hours before finishing, so it's generally best to give up | |
# and try again. | |
for job in remaining_job_results: | |
job['status']['state'] = 'DONE' | |
job['status']['errorResult'] = { | |
'reason': 'ka:timeLimitExceeded' | |
} | |
other_results = remaining_job_results | |
else: | |
# Otherwise, keep waiting. | |
remaining_job_ids = [job['jobReference']['jobId'] | |
for job in remaining_job_results] | |
with pipeline.InOrder(): | |
yield common.Delay(seconds=wait_time_seconds) | |
if time_remaining_seconds is not None: | |
time_remaining_seconds -= wait_time_seconds | |
other_results = yield WaitForJobBatchPipeline( | |
remaining_job_ids, | |
wait_time_seconds * JOB_WAIT_BACKOFF_MULTIPLIER, | |
time_remaining_seconds) | |
else: | |
other_results = [] | |
yield UpdateJobResultsList(completed_results, other_results, | |
remaining_job_indices) | |
def run_http_job_requests( | |
requests, callback, | |
num_requests_per_batch=DEFAULT_NUM_REQUESTS_PER_BATCH): | |
"""Runs the given HTTP requests relating to BigQuery jobs. | |
If any HTTP request fails or any request returns a job failure, an | |
exception is thrown. | |
Arguments: | |
requests: A list of HttpRequest objects. | |
callback: A function taking an index and response parameter. It is | |
called for each request that is returned. The "index" parameter is | |
the index of the http request in the requests list, and the | |
response is the response to the HTTP request. | |
num_requests_per_batch: The number of requests we're we're willing to | |
send in the same request batch. | |
Unlike the more general batch call provided by BatchHttpRequest, the given | |
callback should only have a single parameter for the response. | |
""" | |
def new_callback(request_id, response, exception): | |
if exception: | |
raise exception | |
# If the job failed and is eligible for retry, return the failure | |
# normally instead of failing now so that the parent pipeline has a | |
# chance to handle the failure. | |
if ('errorResult' in response['status'] and | |
not should_retry_job(response)): | |
raise bq_connection.BigQueryError( | |
'Job failed: %s' % response['status']['errorResult']) | |
index = int(request_id) | |
callback(index, response) | |
run_http_requests(requests, new_callback, num_requests_per_batch) | |
def run_http_requests( | |
requests, callback, | |
num_requests_per_batch=DEFAULT_NUM_REQUESTS_PER_BATCH): | |
"""Runs all HttpRequests as a batch with the same callback. | |
If BigQuery sends back errors because the request rate limit is exceeded, | |
this function attempts to back off and retry those failures later, although | |
it eventually will give up. | |
Arguments: | |
requests: A list of HttpRequest objects (such as those returned by | |
function calls on bq_service). | |
callback: A callback to run on each result. See the documentation on | |
BatchHttpRequest for more details. The request_ids provided to the | |
callback will be the string value of the index into the requests | |
list, but the order in which the callbacks are called is not | |
guaranteed to be the same as the order of the requests. | |
num_requests_per_batch: The number of requests we're willing to send in | |
the same request batch. This throttling exists to avoid BigQuery | |
errors from sending too many requests at once, and to work around | |
a bug where BigQuery will only let you insert one job at a time. | |
""" | |
request_ids = [str(num) for num in xrange(len(requests))] | |
for id_and_request_batch in partition(zip(request_ids, requests), | |
num_requests_per_batch): | |
requests_by_id = dict(id_and_request_batch) | |
run_http_requests_with_retry( | |
requests_by_id, callback, | |
retries_remaining=NUM_HTTP_RETRY_ATTEMPTS - 1) | |
def run_http_requests_with_retry(requests_by_id, callback, retries_remaining): | |
"""Run the given requests as a batch, retrying as necessary. | |
If we hit any rate limiting errors or internal BigQuery errors, we sleep | |
for half a second and try again with the failed requests. If the retry | |
limit is exceeded, it is reported to the callback in the same way that | |
other exceptions are reported. | |
Arguments: | |
requests_by_id: A dictionary mapping string request ID to the request | |
itself. | |
callback: A callback to handle the requests, as described in the docs | |
for BatchHttpRequest. | |
retries_remaining: The number of times we're willing to retry again | |
before giving up. | |
""" | |
ids_needing_retry = [] | |
def retry_collecting_callback(request_id, response, exception): | |
if should_retry_http_exception(exception) and retries_remaining > 0: | |
ids_needing_retry.append(request_id) | |
else: | |
callback(request_id, response, exception) | |
http_batch = apiclient_http.BatchHttpRequest() | |
for request_id, request in requests_by_id.iteritems(): | |
http_batch.add(request, callback=retry_collecting_callback, | |
request_id=request_id) | |
http_batch.execute() | |
# If we get a retriable exception (which happens in the case of rate | |
# limiting and internal BigQuery errors), back off a bit to let the system | |
# settle down, then retry. | |
if ids_needing_retry: | |
logging.warning('After submitting %s requests at once, %s needed to ' | |
'be retried.' % | |
(len(requests_by_id), len(ids_needing_retry))) | |
time.sleep(0.5) | |
requests_by_id_to_retry = {request_id: requests_by_id[request_id] | |
for request_id in ids_needing_retry} | |
run_http_requests_with_retry(requests_by_id_to_retry, callback, | |
retries_remaining - 1) | |
def should_retry_http_exception(exception): | |
"""Returns True if the given HTTP exception should be retried. | |
The two main cases that are worth retrying are when we've hit BigQuery API | |
request rate limits or when BigQuery has some other error and gives a | |
message saying "Unexpected. Please try again.". | |
""" | |
if not exception or not isinstance(exception, apiclient_errors.HttpError): | |
return False | |
content = json.loads(exception.content) | |
code, message = content['error']['code'], content['error']['message'] | |
return ((code == 403 and message.startswith('Exceeded rate limits')) or | |
(code == 500 and 'Please try again' in message)) | |
def partition(source_list, partition_size): | |
"""Break the given list up into a list of lists, each of the given size. | |
Every resulting list will have size partition_size except possibly the last | |
one, and no list will be empty. | |
""" | |
return [source_list[n:n + partition_size] | |
for n in xrange(0, len(source_list), partition_size)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Module responsible for providing the active experiments and conversions.""" | |
from __future__ import absolute_import | |
import collections | |
from third_party import markdown | |
from bigbingo import alternative_deciders | |
class Experiment(object): | |
"""Configuration for an experiment. | |
Fields: | |
id: A string identifying the experiment, which can consist only of | |
alphanumeric characters and underscores. This string is used in | |
table names, so it is recommended that the ID be relatively short. | |
Every experiment must have a unique ID. | |
logged_name: The name of the experiment that shows up in logs and in | |
strings in BigQuery. For legacy experiments, this is display_name, | |
and for new-style experiments, this is experiment_id. This field | |
only exists for transition reasons, and should be removed when the | |
last legacy experiment is removed. | |
display_name: A human-readable name for the experiment, used for | |
display purposes, which is allowed to have arbitrary characters. | |
legacy_hashable_name: For legacy experiments, the name for hashing | |
purposes, which is either the family name or the canonical name of | |
the experiment. None if the experiment is not a legacy experiment. | |
alternative_weights: An OrderedDict from alternative name (a string) to | |
weight (an integer). | |
control: A string of the experiment's control, which is used for | |
display purposes. | |
alternative_decider: A function from (Experiment, bingo_id) to | |
alternative which determines the alternative assigned to the given | |
user. This function should be stable in the sense that it always | |
gives the same alternative to any particular user. If the function | |
returns None, the user will be excluded from the test entirely. | |
owner: A string name of the person responsible for the experiment. | |
description: A string explaining the experiment, which will show up | |
when viewing the experiment in the BigBingo dashboard. | |
conclusion: Specify None for active experiments. For finished | |
experiments, provide a string explaining any results and | |
conclusions from the experiment. | |
""" | |
# We can't serialize alternative_decider because it's a function. We also | |
# shouldn't serialize alternative_weights because it's scary to send | |
# alternative strings as dictionary keys since they could get inadvertently | |
# camel-cased. | |
_serialize_blacklist = ['alternative_decider', 'alternative_weights'] | |
def __init__(self, exp_id, logged_name, display_name, legacy_hashable_name, | |
alternative_weights, control, alternative_decider, owner, | |
description, conclusion): | |
assert control in alternative_weights, ( | |
'The control must be one of the alternatives.') | |
assert alternative_decider is not None | |
self.id = exp_id | |
self.logged_name = logged_name | |
self.display_name = display_name | |
self.legacy_hashable_name = legacy_hashable_name | |
self.alternative_weights = alternative_weights | |
self.control = control | |
self.alternative_decider = alternative_decider | |
self.owner = owner | |
self.description = description | |
self.conclusion = conclusion | |
def choose_alternative(self, bingo_id): | |
result = self.alternative_decider(self, bingo_id) | |
if result is not None and result not in self.alternatives: | |
raise ValueError( | |
'The chosen alternative in experiment %s was unexpectedly %s, ' | |
'was expecting one of %s.' % (self.id, result, | |
self.alternatives)) | |
return result | |
@property | |
def alternatives(self): | |
return self.alternative_weights.keys() | |
@property | |
def is_archived(self): | |
return self.conclusion is not None | |
@property | |
def description_html(self): | |
return markdown.markdown(self.description) | |
@property | |
def conclusion_html(self): | |
if self.conclusion is not None: | |
return markdown.markdown(self.conclusion) | |
@property | |
def is_previewable(self): | |
return not self.is_archived | |
class Conversion(object): | |
"""Configuration for a conversion. | |
Fields: | |
id: A string identifying the conversion, which must consist only of | |
alphanumeric characters and underscores. | |
column_name: The string to use in column names in BigQuery. This is | |
often the same as the ID, but may be different for compatibility | |
reasons. | |
logged_name: The name of the conversion that shows up in the logs. | |
display_name: A friendly name of the conversion. | |
""" | |
# Ignore logged_name for consistency with DerivedConversion (and since it's | |
# more of an implementation detail anyway). | |
_serialize_blacklist = ['logged_name'] | |
def __init__(self, conv_id, column_name, logged_name, display_name): | |
self.id = conv_id | |
self.column_name = column_name | |
self.logged_name = logged_name | |
self.display_name = display_name | |
class DerivedConversion(object): | |
"""Configuration for a derived conversion. | |
A derived conversion looks like a regular conversion but isn't valid to | |
log. Instead, it's created by the summarize task based on other conversion | |
values using the "aggregator" field. | |
""" | |
_serialize_blacklist = ['aggregator'] | |
def __init__(self, conv_id, display_name, aggregator): | |
self.id = conv_id | |
self.column_name = conv_id | |
self.display_name = display_name | |
self.aggregator = aggregator | |
def experiment(exp_id, display_name, alts, owner, description, conclusion=None, | |
control=None, decider=alternative_deciders.english_only): | |
"""Creates a standard BigBingo experiment. | |
The experiment may not be used in GAE/Bingo. See the fields on the | |
Experiment class for information on how to call this function. | |
""" | |
if control is None: | |
control = default_control(alts) | |
if description: | |
description = strip_indentation(description) | |
if conclusion: | |
conclusion = strip_indentation(conclusion) | |
return Experiment( | |
exp_id=exp_id, logged_name=exp_id, display_name=display_name, | |
legacy_hashable_name=None, alternative_weights=alts, control=control, | |
alternative_decider=decider, owner=owner, description=description, | |
conclusion=conclusion) | |
def default_control(alts): | |
"""Find the alternative that looks most like the control.""" | |
for alternative in alts: | |
if any(word in alternative.lower() | |
for word in ('old', 'original', 'false', 'control')): | |
return alternative | |
return alts.keys()[0] | |
def strip_indentation(s): | |
return '\n'.join(line.lstrip() for line in s.split('\n')) | |
def bigbingo_conversion(conversion_id, display_name): | |
return Conversion(conv_id=conversion_id, column_name=conversion_id, | |
logged_name=conversion_id, display_name=display_name) | |
def alternatives(*alts): | |
""""Specifies a list of equal-weighted alternatives.""" | |
return weighted_alternatives(*((alt, 1) for alt in alts)) | |
def weighted_alternatives(*weighted_alts): | |
"""Specifies a list of weighted alternatives for an experiment. | |
Each argument should be a pair of (string, int) that specifies the name and | |
weight of the given alternative. | |
""" | |
for name, weight in weighted_alts: | |
assert isinstance(name, basestring), 'Alternatives must be strings.' | |
assert isinstance(weight, int), 'Weights must be integers.' | |
return collections.OrderedDict(weighted_alts) | |
# This list started out as alphabetical and then became chronologically ordered | |
# So, you should put new experiments at the bottom. | |
_ACTIVE_EXPERIMENTS = [ | |
experiment( | |
exp_id='sample_expt', | |
display_name='Sample Experiment', | |
alts=alternatives('control', 'alt_1', 'alt_2'), | |
owner='alan', | |
description=""" | |
This is an example experiment description. | |
Arbitrary markdown is allowed here. | |
""") | |
] | |
# Organized roughly in reverse chronological order. Experiments at the top | |
# were archived more recently. | |
_ARCHIVED_EXPERIMENTS = [ | |
experiment( | |
exp_id='sample_archived_expt', | |
display_name='Sample Archived Experiment', | |
alts=weighted_alternatives(('control', 9), ('alt', 1)), | |
owner='alan', | |
description=""" | |
All experiments end up here when they're archived. This makes sure | |
there is a record of all past experiments and lets the dashboard | |
still display old experiment information. | |
""", | |
conclusion=""" | |
All archived experiments need to have a conclusion filled in. | |
""" | |
) | |
] | |
# Set of string experiment names that are allowed to be used in the code, but | |
# are not tracked by BigBingo. This should be the logged name, so for legacy | |
# experiments (the common case), the full experiment name. | |
_UNKNOWN_EXPERIMENT_WHITELIST = { | |
} | |
# A source conversion is a conversion that is logged directly. This is contrast | |
# to a derived conversion, which is computed during the BigBingo process. | |
# Alphabetical order, with deprecated conversions at the bottom. | |
# It's currently impossible to remove conversions from the BigBingo system. | |
_SOURCE_CONVERSIONS = [ | |
bigbingo_conversion('problem_attempt', 'Attempted to do a problem'), | |
bigbingo_conversion('return_visit', 'Returned to the site'), | |
] | |
# These conversions are allowed to be used in the code, but are not tracked by | |
# BigBingo. | |
_UNKNOWN_SOURCE_CONVERSION_WHITELIST = { | |
} | |
# A derived conversion is something that is computed from other existing | |
# conversion values and simple rules. For example, we may be interested in | |
# the question "did the user do any exercise problems the day after being | |
# exposed to an experiment?". This can be computed from the | |
# `problem_attempt_count` conversion by only counting the ones that were | |
# between 24hr and 48hr after the participation event. | |
# | |
# The format of an entry is as follows: | |
# '<derived_conversion_name>': { | |
# # A sub-section of a SELECT statement that deals with the column | |
# # of the resulting conversion. Records being aggregated are | |
# # rows from a join of the conversion events, and participation | |
# # events. | |
# 'aggregator': '<part of a SELECT>' | |
# } | |
_DERIVED_CONVERSIONS = [ | |
DerivedConversion( | |
'problem_attempt_next_day', 'Problem attempt next day', | |
aggregator=""" | |
SUM(IF( | |
conv_problem_attempt AND | |
logs.time >= part.participation_time + (24 * 3600) AND | |
logs.time < part.participation_time + (48 * 3600), | |
1, 0)) | |
"""), | |
] | |
# Normally, experiments can't be deleted; removing them config.py will cause | |
# the notification system to complain until it is added back. If you really | |
# want to remove an experiment, though, you can add its ID here and the next | |
# summarize task will forget it ever existed. This is mostly useful for testing | |
# or to remove experiments that never should have existed in the first place. | |
_ALLOWED_EXPERIMENT_IDS_TO_DELETE = [] | |
_LOGGED_EXPERIMENT_NAME_SET = {exp.logged_name for exp in _ACTIVE_EXPERIMENTS} | |
_LOGGED_SOURCE_CONVERSION_NAME_SET = {conversion.logged_name | |
for conversion in _SOURCE_CONVERSIONS} | |
_ACTIVE_EXPERIMENTS_BY_ID = {exp.id: exp for exp in _ACTIVE_EXPERIMENTS} | |
_ARCHIVED_EXPERIMENTS_BY_ID = {exp.id: exp for exp in _ARCHIVED_EXPERIMENTS} | |
_FINAL_CONVERSIONS_BY_ID = collections.OrderedDict( | |
[(conversion.id, conversion) for conversion in _SOURCE_CONVERSIONS] + | |
[(conversion.id, conversion) for conversion in _DERIVED_CONVERSIONS] | |
) | |
def get_active_experiments(): | |
return _ACTIVE_EXPERIMENTS | |
def get_archived_experiments(): | |
return _ARCHIVED_EXPERIMENTS | |
def get_active_experiment_by_id(exp_id): | |
"""Returns the experiment with the given ID. | |
Raises KeyError if the experiment doesn't exist. | |
""" | |
return _ACTIVE_EXPERIMENTS_BY_ID[exp_id] | |
def get_possibly_archived_experiment(exp_id): | |
"""Returns the given experiment, even if it's in the archive. | |
Returns None if the experiment doesn't exist. | |
""" | |
return (_ACTIVE_EXPERIMENTS_BY_ID.get(exp_id) or | |
_ARCHIVED_EXPERIMENTS_BY_ID.get(exp_id)) | |
def is_experiment_valid(experiment_name): | |
"""Determines if the experiment string is allowed to be logged.""" | |
return (experiment_name in _LOGGED_EXPERIMENT_NAME_SET or | |
is_experiment_whitelisted(experiment_name)) | |
def is_experiment_whitelisted(experiment_name): | |
return experiment_name in _UNKNOWN_EXPERIMENT_WHITELIST | |
def get_conversion_by_id(conv_id): | |
"""Gets a conversion by ID, or crashes if the conversion doesn't exist. | |
Note that the result might be a regular Conversion or it might be a | |
DerivedConversion. | |
""" | |
return _FINAL_CONVERSIONS_BY_ID[conv_id] | |
def get_source_conversion_by_id(conv_id): | |
conv = get_conversion_by_id(conv_id) | |
if not isinstance(conv, Conversion): | |
raise KeyError('The conversion %s was not a source conversion.' % | |
conv_id) | |
return conv | |
def is_source_conversion_id_valid(conv_id): | |
try: | |
get_source_conversion_by_id(conv_id) | |
return True | |
except KeyError: | |
return False | |
def get_source_conversions(): | |
return _SOURCE_CONVERSIONS | |
def get_source_conversion_ids(): | |
"""Return a list of all conversions to extract from the logs. | |
Note the final list of conversions tracked and aggregated may be | |
expanded to include derived conversions. See get_final_conversion_ids() | |
and get_derived_conversions() for more details. | |
""" | |
return [conversion.id for conversion in _SOURCE_CONVERSIONS] | |
def is_source_conversion_valid(conversion_name): | |
return (conversion_name in _LOGGED_SOURCE_CONVERSION_NAME_SET or | |
conversion_name in _UNKNOWN_SOURCE_CONVERSION_WHITELIST) | |
def get_final_conversion_columns(): | |
"""Return the final list of column names to use.""" | |
return [conv.id for conv in _FINAL_CONVERSIONS_BY_ID.itervalues()] | |
def is_final_conversion_id_valid(conversion_id): | |
return conversion_id in _FINAL_CONVERSIONS_BY_ID | |
def get_derived_conversions(): | |
"""Return information about derived conversions. | |
See the comment on _DERIVED_CONVERSIONS for details of the format. | |
""" | |
return _DERIVED_CONVERSIONS | |
def get_allowed_deletions(): | |
return _ALLOWED_EXPERIMENT_IDS_TO_DELETE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Logic for config information kept in the datastore. | |
Although the primary source of truth for BigBingo configuration is config.py, | |
we store some information in the datastore so that we can detect what changed | |
and send the appropriate notifications. | |
""" | |
from __future__ import absolute_import | |
from google.appengine.ext import ndb | |
from third_party import alertlib | |
import db_util | |
from bigbingo import config | |
@db_util.disable_ndb_memcache | |
class BigBingoExperiment(ndb.Model): | |
"""Entity tracking the existence of an experiment. | |
Note that config.py is still the canonical source of truth for the active | |
experiments and their configuration. The datastore is useful for detecting | |
when config.py is different from last time and responding appropriately | |
(e.g. sending notifications for new/stopped experiments and giving a | |
warning when an experiment has mysteriously disappeared.) | |
""" | |
start_time = ndb.DateTimeProperty(indexed=False, auto_now_add=True) | |
last_updated = ndb.DateTimeProperty(indexed=False, auto_now=True) | |
owner = ndb.StringProperty(indexed=False) | |
is_archived = ndb.BooleanProperty(indexed=False) | |
def send_notification(subject, notification, short_notification, color): | |
"""Sends a notification to HipChat. | |
Arguments: | |
subject: The subject line for the email to send. | |
notification: A possibly-long string with the notification text. | |
short_notification: A version of the notification that can fit | |
comfortably in a HipChat message. | |
color: A valid color to use in HipChat. See the documentation on | |
send_to_hipchat for more details. | |
""" | |
alertlib.Alert(short_notification, html=True).send_to_hipchat( | |
'1s and 0s', color, sender='BigBingo Baboon') | |
alertlib.Alert(notification, subject, html=True).send_to_email( | |
'[email protected]') | |
def notify_new_active_experiment(experiment): | |
subject = 'Experiment started: %s' % experiment.display_name | |
format_str = ("%s's new BigBingo experiment %s just started.\n" | |
"Description: %s") | |
link = experiment_link(experiment) | |
notification = format_str % (experiment.owner, link, | |
experiment.description_html) | |
short_notification = format_str % (experiment.owner, link, | |
truncate(experiment.description)) | |
send_notification(subject, notification, short_notification, 'purple') | |
def notify_archived_experiment(experiment): | |
subject = 'Experiment stopped: %s' % experiment.display_name | |
format_str = "%s's BigBingo experiment %s just stopped.\nConclusion: %s" | |
link = experiment_link(experiment) | |
notification = format_str % (experiment.owner, link, | |
experiment.conclusion_html) | |
short_notification = format_str % (experiment.owner, link, | |
truncate(experiment.conclusion)) | |
send_notification(subject, notification, short_notification, 'purple') | |
def notify_missing_experiment(experiment_entity): | |
subject = 'Missing experiment: %s' % experiment_entity.key.string_id() | |
message = ( | |
'Error: The BigBingo experiment %s, owned by %s, has gone missing! ' | |
'Please add it back to bigbingo/config.py. If you meant to stop the ' | |
'experiment, you can do so by adding it to the list of archived ' | |
'experiments.' % | |
(experiment_entity.key.string_id(), experiment_entity.owner)) | |
send_notification(subject, message, message, 'red') | |
def notify_unarchived_experiment(experiment): | |
subject = 'Unarchived experiment: %s' % experiment.display_name | |
message = ( | |
"That's strange, the %s's BigBingo experiment %s was moved from the " | |
"archive back to being an active experiment. If you actually want to " | |
"resume the experiment, you'll have missing data unless you manually " | |
"re-run the summarize task over the time that the experiment was " | |
"archived." % (experiment.owner, experiment_link(experiment))) | |
send_notification(subject, message, message, 'yellow') | |
def notify_new_archived_experiment(experiment): | |
subject = 'New archived experiment: %s' % experiment.display_name | |
message = ( | |
"That's strange, %s's BigBingo experiment %s was seen in the list of " | |
"archived experiments, but it hasn't ever been active!" % | |
(experiment.owner, experiment_link(experiment))) | |
send_notification(subject, message, message, 'yellow') | |
def experiment_link(experiment): | |
return ('<a href="http://khanacademy.org/bigbingo/experiment/%s">%s</a>' % | |
(experiment.id, experiment.display_name)) | |
def truncate(string): | |
return string if len(string) < 50 else (string[:50] + '...') | |
def warn_for_missing_experiments(experiment_entities): | |
# If any experiments are in the datastore but not the config, send a | |
# notification that you're not supposed to do that. Note that we | |
# intentionally do NOT remove the entity from the datastore, since we want | |
# to keep poking HipChat until it gets fixed. If people really want to make | |
# an exception to that rule, though, they can set config.py to allow some | |
# experiments to be deleted, in which case we actually delete them here. | |
keys_to_delete = [] | |
for entity in experiment_entities: | |
experiment_id = entity.key.string_id() | |
if config.get_possibly_archived_experiment(experiment_id) is None: | |
if experiment_id in config.get_allowed_deletions(): | |
keys_to_delete.append(entity.key) | |
else: | |
notify_missing_experiment(entity) | |
if keys_to_delete: | |
ndb.delete_multi(keys_to_delete) | |
def update_config_and_notify(): | |
"""Update the datastore with any config changes and send notifications. | |
Every change to config.py is worth broadcasting, so we keep track of the | |
previous set of experiments so we can see what changed. For new and | |
newly-stopped experiments, we announce them so people have an understanding | |
of what experiments are going on. If an experiment starts in the archive | |
or changes from archived to active, it is probably worth having a human | |
sanity-check it anyway, so we broadcast warnings for those cases. Finally, | |
if an experiment used to exist and doesn't anymore, we repeatedly give an | |
error, because the experiment was really supposed to be archived instead. | |
""" | |
# There currently aren't that many experiments, so it should be fine to | |
# just fetch all of them. Also, this query is only eventually consistent, | |
# but that's fine since it runs so infrequently and occasional incorrect | |
# notifications aren't a big deal anyway. | |
all_experiment_entities = BigBingoExperiment.query().fetch() | |
experiment_entities_by_key = { | |
entity.key: entity for entity in all_experiment_entities} | |
warn_for_missing_experiments(all_experiment_entities) | |
entities_to_store = [] | |
for experiment in (config.get_active_experiments() + | |
config.get_archived_experiments()): | |
experiment_key = ndb.Key(BigBingoExperiment, experiment.id) | |
experiment_entity = experiment_entities_by_key.get(experiment_key) | |
if experiment_entity is None: | |
# The entry in config.py is new. | |
if experiment.is_archived: | |
notify_new_archived_experiment(experiment) | |
else: | |
notify_new_active_experiment(experiment) | |
entities_to_store.append( | |
BigBingoExperiment(key=experiment_key, owner=experiment.owner, | |
is_archived=experiment.is_archived)) | |
elif experiment_entity.is_archived != experiment.is_archived: | |
# The entry in config.py moved between the archive and active list. | |
if experiment.is_archived: | |
notify_archived_experiment(experiment) | |
else: | |
notify_unarchived_experiment(experiment) | |
experiment_entity.is_archived = experiment.is_archived | |
entities_to_store.append(experiment_entity) | |
if entities_to_store: | |
ndb.put_multi(entities_to_store) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The MIT License (MIT) | |
Copyright (c) 2014 Khan Academy | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Utilities for dealing with BigBingo events in the logs.""" | |
from __future__ import absolute_import | |
import json | |
import logging | |
from bigbingo import config | |
import event_log | |
import ka_globals | |
_PARTICIPATION_EVENT_PREFIX = 'BINGO_PARTICIPATION_EVENT:' | |
_CONVERSION_EVENT_PREFIX = 'BINGO_CONVERSION_EVENT:' | |
class InvalidBingoEventError(Exception): | |
pass | |
def log_participation_event(bingo_identity, experiment_name, alternative): | |
if not config.is_experiment_valid(experiment_name): | |
message = ('The experiment named "%s" was used, but was not found in ' | |
'bigbingo/config.py . All active experiments must be registered ' | |
'there (or added to the experiment whitelist).' % experiment_name) | |
if ka_globals.is_dev_server: | |
raise InvalidBingoEventError(message) | |
else: | |
logging.error(message) | |
logging.debug(_PARTICIPATION_EVENT_PREFIX + | |
json.dumps({ | |
'bingo_id': bingo_identity, | |
'experiment': experiment_name, | |
'alternative': alternative | |
})) | |
def log_conversion_event(bingo_identity, conversion_id): | |
"""Records that the given conversion event happened. | |
Normal application code should not call this directly; it should use | |
bigbingo.mark_conversion instead. | |
""" | |
event_log.log_event('bingo.param', conversion_id) | |
logging.debug(_CONVERSION_EVENT_PREFIX + | |
json.dumps({ | |
'bingo_id': bingo_identity, | |
'conversion': conversion_id, | |
})) | |
def parse_participation_events(log_lines): | |
"""Returns a generator for all participation events in the given logs. | |
The yielded results are dicts constructed from the logged JSON. | |
Arguments: | |
log_lines: A list of strings, which typically correspond to all log | |
lines in a particular request. | |
""" | |
return _parse_events_with_prefix(log_lines, _PARTICIPATION_EVENT_PREFIX) | |
def parse_conversion_events(log_lines): | |
"""Returns a generator for all conversion events in the given logs. | |
The input and output format are the same as parse_participation_events, so | |
see its docstring for more details. | |
""" | |
return _parse_events_with_prefix(log_lines, _CONVERSION_EVENT_PREFIX) | |
def _parse_events_with_prefix(log_lines, prefix): | |
for line in log_lines: | |
if not line.startswith(prefix): | |
continue | |
yield json.loads(line[len(prefix):]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Summarize task for BigBingo experiments | |
This task regularly runs in the background to aggregate the latest bingo log | |
data. The raw data that we operate on are the bingo_participation_events and | |
bingo_conversion_events records generated from the request logs in | |
log_to_bigquery.py. | |
""" | |
from __future__ import absolute_import | |
import time | |
from third_party.mapreduce.lib.pipeline import common | |
from third_party.mapreduce.lib.pipeline import pipeline | |
from bigbingo import config | |
from bigbingo import datastore_config | |
from bigquery import bq_connection | |
from bigquery import bq_pipelines | |
import log_to_bigquery | |
import pipeline_util | |
import request_handler | |
import user_util | |
# All fields should be nullable because the participants table is replaced by | |
# the result of the query when the summarize process runs, and the inferred | |
# schema has every field as nullable when writing a query result to a table. | |
PARTICIPANTS_SCHEMA = { | |
'fields': [ | |
{'name': 'experiment', 'type': 'STRING', 'mode': 'NULLABLE'}, | |
{'name': 'bingo_id', 'type': 'STRING', 'mode': 'NULLABLE'}, | |
{'name': 'alternative', 'type': 'STRING', 'mode': 'NULLABLE'}, | |
{'name': 'participation_time', 'type': 'FLOAT', 'mode': 'NULLABLE'}, | |
] | |
} | |
def get_conversions_schema(): | |
return { | |
'fields': [ | |
{'name': 'experiment', 'type': 'STRING', 'mode': 'NULLABLE'}, | |
{'name': 'bingo_id', 'type': 'STRING', 'mode': 'NULLABLE'}, | |
{'name': 'alternative', 'type': 'STRING', 'mode': 'NULLABLE'}, | |
{'name': 'last_updated_time', 'type': 'FLOAT', 'mode': 'NULLABLE'} | |
] + [ | |
{'name': 'conv_' + conv_col, 'type': 'INTEGER', 'mode': 'NULLABLE'} | |
for conv_col in config.get_final_conversion_columns() | |
] | |
} | |
def get_conversion_snapshots_schema(): | |
fields = [ | |
{'name': 'experiment', 'type': 'STRING', 'mode': 'NULLABLE'}, | |
{'name': 'alternative', 'type': 'STRING', 'mode': 'NULLABLE'}, | |
{'name': 'snapshot_time', 'type': 'INTEGER', 'mode': 'NULLABLE'} | |
] | |
for conv_id in config.get_final_conversion_columns(): | |
fields.extend([ | |
{'name': 'count_' + conv_id, | |
'type': 'INTEGER', 'mode': 'NULLABLE'}, | |
{'name': 'binary_' + conv_id, | |
'type': 'INTEGER', 'mode': 'NULLABLE'} | |
]) | |
return {'fields': fields} | |
class StartSummarize(request_handler.RequestHandler): | |
"""Kicks off the BigBingo summarize task to process a time range of logs. | |
Arguments: | |
dataset: The name of the dataset to work with. In production, this is | |
always the "bigbingo" dataset, but for manual testing, you should | |
choose a different one, such as "bigbingo_test". This paramater is | |
required since a mistake could cause data corruption. | |
logs_dataset: The name of the "base dataset" used in LogToBigQuery. | |
In practice, and by default, this is a dataset called "logs". The | |
actual logs table(s) used are either in this dataset or in the | |
hourly version of this dataset (normally "logs_hourly"), depending | |
on whether use_daily_logs is set. | |
publish_dataset: The name of the dataset to copy result tables to, or | |
None if no publishing should occur. | |
archive_dataset: The name of the dataset to which to archive old | |
experiments, or None if no archiving should occur. | |
end_time: The upper bound on the time range to process, specified as a | |
UNIX timestamp in seconds. This time must be on an hour boundary. | |
If not specified, the hour boundary before the most recent one is | |
used (e.g. end_time is 3:00 if the current time is 4:30). This | |
allows the LogToBigQuery process to have enough time to export the | |
logs to BigQuery before we use them. | |
num_hours: The number of hours of data to process (going backward from | |
end_time). All hours processed must fall within the same day (UTC). | |
If not specified, we process 2 hours. | |
is_initial_run: If 0 (the default) we expect to find tables from a | |
previous run of the summarize task. If 1, dummy empty versions of | |
these tables will be created. | |
use_daily_logs: If 0 (the default), we read from logs tables that each | |
contain an hour of data. If 1, we read from the daily logs table | |
containing the hours that we are processing. It is usually | |
preferable to use the hourly logs tables so that the summarize task | |
will automatically fail if LogToBigQuery is failing. | |
""" | |
# The admin requirement is enforced by the /admin path. We can't directly | |
# use admin_required here since it stops cron from being able to access it. | |
@user_util.manual_access_checking | |
def get(self): | |
dataset = self.request_string('dataset') | |
if not dataset: | |
raise ValueError('The dataset name must be specified.') | |
logs_dataset = self.request_string('logs_dataset', 'logs') | |
publish_dataset = self.request_string('publish_dataset', None) | |
archive_dataset = self.request_string('archive_dataset', None) | |
# If unspecified, end_time is computed in get_and_validate_time_window. | |
end_time = self.request_int('end_time', -1) | |
num_hours = self.request_int('num_hours', 2) | |
is_initial_run = self.request_bool('is_initial_run', False) | |
use_daily_logs = self.request_bool('use_daily_logs', False) | |
start_time, end_time = self.get_and_validate_time_window( | |
end_time, num_hours, use_daily_logs) | |
summarize_pipeline = SummarizePipeline( | |
dataset, logs_dataset, publish_dataset, archive_dataset, | |
start_time, end_time, is_initial_run, | |
use_daily_logs).with_params(target='mapreduce') | |
summarize_pipeline.start(queue_name='bigbingo-queue') | |
self.response.out.write( | |
'Successfully started summarize task. ' | |
'<a href="/_ah/pipeline/status?root=%s">Monitor task</a>' % | |
summarize_pipeline.pipeline_id) | |
@staticmethod | |
def get_and_validate_time_window(end_time, num_hours, use_daily_logs): | |
"""Validates and returns a [start_time, end_time) time range to use. | |
The time window is defined by ending at end_time and going back | |
num_hours hours. The start and end time need to be hour-aligned. | |
Arguments: | |
end_time: The last timestamp to process, as a UNIX time in seconds, | |
or -1. If this argument is -1, the hour mark before the most | |
recent one is used. | |
num_hours: The size of the window to process, in hours. | |
use_daily_logs: True if the daily logs table should be used instead | |
of the hourly logs tables. | |
Returns: | |
A pair (start_time, end_time). The end_time is returned since it is | |
filled in when it is -1. | |
""" | |
if end_time == -1: | |
current_time_seconds = time.time() | |
end_time = (int(current_time_seconds / 3600) - 1) * 3600 | |
if end_time % 3600 != 0: | |
raise ValueError('The end_time parameter must be exactly on an ' | |
'hour boundary.') | |
start_time = end_time - num_hours * 3600 | |
if use_daily_logs and ( | |
start_time // (24 * 3600) != (end_time - 1) // (24 * 3600)): | |
raise ValueError('The specified time window crosses multiple ' | |
'days, which is not allowed.') | |
return start_time, end_time | |
class StartSummarizeBackfill(request_handler.RequestHandler): | |
"""Kicks off multiple summarize tasks in series. | |
The time range can span multiple days, but each individual summarize task | |
should be contained within one day. The arguments are very similar to the | |
arguments for StartSummarize. | |
Arguments: | |
dataset: The name of the dataset for the BigBingo tables. | |
logs_dataset: The base dataset containing the logs to process. | |
publish_dataset: (Optional) The dataset to copy result tables to. | |
archive_dataset: The name of the dataset to which to archive old | |
experiments, or None if no archiving should occur. For backfill | |
jobs not running up to the present, this should be None unless you | |
know what you're doing. (Otherwise, data on experiments recently | |
archived from the time after your backfill ends until the | |
experiment was archived could be lost.) | |
start_time: The start of the time range to process. This must be on an | |
hour boundary. | |
end_time: The end of the time range to process. This must be on an hour | |
boundary. | |
num_hours_per_run: The size of each summarize task to run. Larger | |
values make the backfill faster and cheaper, but more likely to | |
fail. | |
is_initial_run: False if we expect previous tables to already exist. | |
use_daily_logs: True if we should use daily rather than hourly logs. | |
""" | |
@user_util.admin_required | |
def get(self): | |
dataset = self.request_string('dataset') | |
if not dataset: | |
raise ValueError('The dataset name must be specified.') | |
logs_dataset = self.request_string('logs_dataset', 'logs') | |
publish_dataset = self.request_string('publish_dataset', None) | |
archive_dataset = self.request_string('archive_dataset', None) | |
start_time = self.request_int('start_time') | |
end_time = self.request_int('end_time') | |
num_hours_per_run = self.request_int('num_hours_per_run') | |
is_initial_run = self.request_bool('is_initial_run', False) | |
use_daily_logs = self.request_bool('use_daily_logs', False) | |
# TODO(alan): This first check is overly defensive in a lot of cases. | |
if 24 % num_hours_per_run != 0: | |
raise ValueError('num_hours_per_run must evenly divide a day.') | |
if start_time % (num_hours_per_run * 3600) != 0: | |
raise ValueError('start_time must be on a boundary defined by ' | |
'num_hours_per_run.') | |
if end_time % (num_hours_per_run * 3600) != 0: | |
raise ValueError('end_time must be on a boundary defined by ' | |
'num_hours_per_run.') | |
summarize_pipeline = BackfillSummarizePipeline( | |
dataset, logs_dataset, publish_dataset, archive_dataset, | |
start_time, end_time, num_hours_per_run, is_initial_run, | |
use_daily_logs).with_params(target='mapreduce') | |
summarize_pipeline.start(queue_name='bigbingo-queue') | |
self.response.out.write( | |
'Successfully started backfill summarize task. ' | |
'<a href="/_ah/pipeline/status?root=%s">Monitor task</a>' % | |
summarize_pipeline.pipeline_id) | |
def get_logs_table_expr(logs_base_dataset, start_time, end_time, | |
use_daily_logs): | |
"""Returns a table expression for the logs table(s) to query over. | |
The simple (although non-default) case is that we just return a single | |
daily table, like "logs.requestlogs_20140421". The default case, though, is | |
that we want to select over a logs table for each hour, like | |
"logs_hourly.20140421_00, logs_hourly.20140421_01" (note that the comma | |
here is the BigQuery way to do UNION ALL). | |
Arguments: | |
logs_base_dataset: The base dataset name to use, usually "logs". This | |
is the name of the daily logs dataset, and appending "_hourly" to | |
the end finds the hourly logs dataset. | |
start_time: The start of the time range we are processing. | |
end_time: The end of the time range we are processing. | |
use_daily_logs: False if we should read from the hourly logs, True if | |
we should read from the daily logs. | |
""" | |
if use_daily_logs: | |
dataset = logs_base_dataset | |
return dataset + '.' + log_to_bigquery.daily_logs_table_name(end_time) | |
else: | |
dataset = logs_base_dataset + '_hourly' | |
# The name function takes the end time, so compute it from each start | |
# time by adding 1 hour. | |
return ', '.join( | |
dataset + '.' + log_to_bigquery.hourly_logs_table_name( | |
hour_start_time + 3600) | |
for hour_start_time in xrange(start_time, end_time, 3600) | |
) | |
def participants_table(unix_time): | |
"""Returns the name of the given participants table.""" | |
return table_for_time('participants', unix_time) | |
def conversions_table(unix_time): | |
"""Returns the name of the given conversions table.""" | |
return table_for_time('conversions', unix_time) | |
def pending_conversions_table(start_time, end_time): | |
"""Returns the name of the given pending conversions table.""" | |
return table_for_time_range('pending_conversions', start_time, end_time) | |
def new_participation_events_table(start_time, end_time): | |
return table_for_time_range('new_participation_events', start_time, | |
end_time) | |
def new_conversion_events_table(start_time, end_time): | |
return table_for_time_range('new_conversion_events', start_time, end_time) | |
def table_for_time(base_name, unix_time): | |
"""Returns the name to use for the given table as of the given time. | |
BigBingo tables are given timestamps to more reliably ensure that the | |
summarize task is working off of the correct data, and so that the | |
summarize task can fail at any point in the middle without causing | |
problems. | |
The given unix_time should be the start or end time for a summarize task, | |
so it should be exactly on an hour boundary. The actual hour given is in | |
GMT. | |
""" | |
return base_name + '_' + time.strftime('%Y_%m_%d_%H', | |
time.gmtime(unix_time)) | |
def table_for_time_range(base_name, start_time, end_time): | |
"""Returns the name to use for a table with data from a time range. | |
When a table represents data corresponding to a range of time (e.g. the | |
conversion events happening between 3pm and 6pm), we name it with a time | |
range. This is good practice because it helps in debugging and it guards | |
against pathological cases such as the time ranges [1pm, 3pm] and | |
[2pm, 3pm] being processed at the same time. | |
""" | |
return table_for_time(table_for_time(base_name, start_time), end_time) | |
def raw_table(experiment_id): | |
"""The name of the table where the raw experiment data is stored. | |
This is a view in the publish dataset and a regular table in the archive. | |
It stores the complete per-bingo_id data for the experiment, to allow for | |
further analysis as desired. | |
""" | |
return 'raw_' + experiment_id + '_data' | |
def totals_table(experiment_id): | |
"""The name of the table where summary experiment data is archived. | |
This only exists in the archive, and stores the snapshots every two hours | |
that are displayed in the BigBingo dashboard. | |
""" | |
return 'historical_' + experiment_id + '_totals' | |
class BackfillSummarizePipeline(pipeline.Pipeline): | |
"""Run multiple summarize tasks in series. | |
See the documentation for StartSummarizeBackfill and SummarizePipeline for | |
descriptions of the arguments. | |
""" | |
def run(self, dataset, logs_dataset, publish_dataset, archive_dataset, | |
start_time, end_time, num_hours_per_run, is_initial_run, | |
use_daily_logs): | |
with pipeline.InOrder(): | |
for batch_start_time in xrange(start_time, end_time, | |
num_hours_per_run * 3600): | |
batch_end_time = batch_start_time + num_hours_per_run * 3600 | |
yield SummarizePipeline( | |
dataset, logs_dataset, publish_dataset, None, | |
batch_start_time, batch_end_time, is_initial_run, | |
use_daily_logs) | |
is_initial_run = False | |
if archive_dataset: | |
# Always do all the archiving at the end. | |
experiments_to_archive = [ | |
e.id for e in config.get_archived_experiments()] | |
yield ArchiveExperimentsPipeline( | |
dataset, publish_dataset, archive_dataset, end_time, | |
experiments_to_archive) | |
class SummarizePipeline(pipeline.Pipeline): | |
"""Top-level pipeline for the summarize task. | |
Arguments: | |
dataset: The BigQuery dataset for the BigBingo input and output tables. | |
logs_dataset: The dataset containing the logs table to use. | |
publish_dataset: The dataset to copy the result tables to at the end, | |
or None if we shouldn't do any publishing. | |
archive_dataset: The dataset to which to archive any experiments that | |
should be archived, or None if we shouldn't do any archiving. | |
start_time: The inclusive low bound on the time range to process. | |
end_time: The exclusive high bound on the time range to process. | |
num_hours: The number of hours to process. This must be the difference | |
in hours between end_time and start_time. | |
is_initial_run: False if we should expect the previous tables to exist, | |
True otherwise. | |
use_daily_logs: True if we should | |
""" | |
output_names = ['cost'] | |
def run(self, dataset, logs_base_dataset, publish_dataset, archive_dataset, | |
start_time, end_time, is_initial_run, use_daily_logs): | |
# Before actually running summarize, make sure we keep the datastore | |
# BigBingo knowledge up-to-date and send any relevant notifications. | |
datastore_config.update_config_and_notify() | |
logs_table_expr = get_logs_table_expr( | |
logs_base_dataset, start_time, end_time, use_daily_logs) | |
parts_extracted = yield ExtractParticipationEventsPipeline( | |
dataset, logs_table_expr, start_time, end_time) | |
convs_extracted = yield ExtractConversionEventsPipeline( | |
dataset, logs_table_expr, start_time, end_time) | |
old_tables_checked = yield CheckOrCreateOldTablesPipeline( | |
dataset, is_initial_run, start_time) | |
with pipeline.After(old_tables_checked, parts_extracted): | |
parts_processed = yield ProcessParticipationEventsPipeline( | |
dataset, start_time, end_time) | |
# Processing conversions depends on the new participants table, so | |
# wait until parts_processed is ready. | |
with pipeline.After(convs_extracted, parts_processed): | |
convs_processed = yield ProcessConversionEventsPipeline( | |
dataset, start_time, end_time) | |
with pipeline.After(convs_processed): | |
convs_built = yield BuildConversionsTablePipeline( | |
dataset, start_time, end_time) | |
with pipeline.After(parts_processed): | |
parts_computed = yield ComputeParticipantCountSummaryPipeline( | |
dataset, end_time) | |
with pipeline.After(convs_built): | |
convs_computed = yield ComputeConversionCountSummaryPipeline( | |
dataset, end_time) | |
# Track all pipelines that did any queries (since those are the | |
# only actions that cost money) so we can return the total cost | |
# at the end. | |
futures_with_cost = [ | |
parts_extracted, convs_extracted, parts_processed, | |
convs_processed, convs_built, parts_computed, | |
convs_computed] | |
if publish_dataset: | |
with pipeline.After(parts_computed, convs_computed): | |
tables_published = yield PublishTablesPipeline( | |
dataset, publish_dataset, end_time) | |
futures_with_cost.append(tables_published) | |
if archive_dataset: | |
with pipeline.After(tables_published): | |
experiments_to_archive = [ | |
e.id for e in config.get_archived_experiments()] | |
experiments_archived = yield ArchiveExperimentsPipeline( | |
dataset, publish_dataset, archive_dataset, end_time, | |
experiments_to_archive) | |
futures_with_cost.append(experiments_archived) | |
total_cost = yield common.Sum( | |
*[future.cost for future in futures_with_cost]) | |
yield bq_pipelines.ReturnOutputs("Success", cost=total_cost) | |
def finalized(self): | |
pipeline_util.send_pipeline_result_email( | |
self, | |
sender='big-bingo-bot', | |
to='analytics-admin', | |
job_name='BigBingo Summarize') | |
class CheckOrCreateOldTablesPipeline(pipeline.Pipeline): | |
"""Sanity check the "previous" tables that we will be working off of. | |
The common case is that we're adding to existing experiment data, so the | |
tables corresponding to start_time should exist. If we know that we're | |
starting from scratch, though, we should just create stub tables instead. | |
""" | |
def run(self, dataset, is_initial_run, start_time): | |
old_participants_table = participants_table(start_time) | |
old_conversions_table = conversions_table(start_time) | |
bq_connection.ensure_bigquery_table_exists( | |
dataset, old_participants_table, PARTICIPANTS_SCHEMA, | |
create_if_necessary=is_initial_run, | |
patch_schema_if_necessary=False, ttl_days=7) | |
bq_connection.ensure_bigquery_table_exists( | |
dataset, old_conversions_table, get_conversions_schema(), | |
create_if_necessary=is_initial_run, patch_schema_if_necessary=True, | |
ttl_days=7) | |
# The conversion_snapshots table also needs to be updated with any new | |
# conversions, or else appending to that table will fail. It is not | |
# necessary for this operation to remove unused conversions, since | |
# BigQuery will automatically insert nulls for those columns when | |
# appending to the table. | |
# TODO(alan): The schema for this table will only grow over time, which | |
# can eventually cause problems and create bloat. One solution to this | |
# problem is to generate a new snapshots table every summarize run | |
# (since columns can be remove when making a new table), but that's not | |
# totally straightforward because of the way the table gets appended | |
# to. Another approach is to get rid of conversion columns in the | |
# schema for this table and just have a row for every conversion. | |
bq_connection.ensure_bigquery_table_exists( | |
dataset, 'conversion_snapshots', | |
get_conversion_snapshots_schema(), create_if_necessary=True, | |
patch_schema_if_necessary=True) | |
class ExtractParticipationEventsPipeline(pipeline.Pipeline): | |
"""Extract participation events from the logs. | |
You can't join on a repeated column, so this query extracts the events that | |
we're interested in into a flat table. It also gets rid of redundant events | |
(we know up-front that we can ignore participation events after the first | |
for each user and experiment). | |
""" | |
output_names = ['cost'] | |
def run(self, dataset, logs_table_expr, start_time, end_time): | |
# Note that the logservice considers the end time to be the time for | |
# the request, so we need to use that as our notion of "request time" | |
# or else some hours will be split across two tables. | |
query = """ | |
SELECT | |
bingo_participation_events.experiment AS experiment, | |
bingo_participation_events.bingo_id AS bingo_id, | |
bingo_participation_events.alternative AS alternative, | |
MIN(end_time) AS participation_time | |
FROM %(logs_table_expr)s | |
WHERE | |
bingo_participation_events.experiment IS NOT NULL | |
AND | |
end_time >= %(start_time)s | |
AND | |
end_time < %(end_time)s | |
GROUP EACH BY experiment, bingo_id, alternative | |
""" % { | |
'logs_table_expr': logs_table_expr, | |
'start_time': start_time, | |
'end_time': end_time | |
} | |
yield bq_pipelines.QueryToTablePipeline( | |
query, dataset, | |
new_participation_events_table(start_time, end_time), | |
result_table_ttl_days=1) | |
class ExtractConversionEventsPipeline(pipeline.Pipeline): | |
"""Builds a table of the conversion events for this summarize run.""" | |
output_names = ['cost'] | |
def run(self, dataset, logs_table_expr, start_time, end_time): | |
conversions = config.get_source_conversions() | |
query = """ | |
SELECT | |
bingo_conversion_events.bingo_id AS bingo_id, | |
end_time AS time, | |
%(conversion_selects)s | |
FROM %(logs_table_expr)s | |
WHERE | |
bingo_conversion_events.bingo_id IS NOT NULL | |
AND | |
end_time >= %(start_time)s | |
AND | |
end_time < %(end_time)s | |
AND | |
bingo_conversion_events.conversion IN %(conversion_names)s | |
""" % { | |
'logs_table_expr': logs_table_expr, | |
'start_time': start_time, | |
'end_time': end_time, | |
# Store false as null since nulls are free. | |
'conversion_selects': ','.join([""" | |
IF(bingo_conversion_events.conversion = "%(conv_id)s", | |
true, null) AS conv_%(conv_id)s | |
""" % { | |
'conv_id': conv.id | |
} for conv in conversions]), | |
'conversion_names': '(' + | |
','.join('"%s"' % conv.id for conv in conversions) + ')' | |
} | |
yield bq_pipelines.QueryToTablePipeline( | |
query, dataset, new_conversion_events_table(start_time, end_time), | |
result_table_ttl_days=1) | |
class ProcessParticipationEventsPipeline(pipeline.Pipeline): | |
"""Determine the new set of participants for each experiment. | |
This stage unions the existing participants with the new participants and | |
does a GROUP BY to ensure that we don't include participants more than | |
once. | |
""" | |
output_names = ['cost'] | |
def run(self, dataset, start_time, end_time): | |
old_participants_table = participants_table(start_time) | |
new_participants_table = participants_table(end_time) | |
events_table = new_participation_events_table(start_time, end_time) | |
query = """ | |
SELECT | |
experiment, | |
bingo_id, | |
alternative, | |
MIN(participation_time) AS participation_time | |
FROM | |
%(events_table)s, -- UNION ALL | |
%(participants_table)s | |
GROUP EACH BY | |
experiment, bingo_id, alternative | |
""" % { | |
'events_table': dataset + '.' + events_table, | |
'participants_table': dataset + '.' + old_participants_table | |
} | |
yield bq_pipelines.QueryToTablePipeline( | |
query, dataset, new_participants_table, result_table_ttl_days=7) | |
class ProcessConversionEventsPipeline(pipeline.Pipeline): | |
"""Computes the new conversion values to add for this summarize job. | |
Given the new_conversion_events table (an experiment-independent table of | |
conversion events for this summarize task) and the newly-computed | |
participants table, we determine what values to add to what conversions. | |
The output table has the same format as the full conversions table, but | |
only contains conversion numbers for this summarize task. The new | |
conversions table is computed in BuildConversionsTablePipeline. | |
""" | |
output_names = ['cost'] | |
def run(self, dataset, start_time, end_time): | |
conversions = config.get_source_conversions() | |
derived_conversions = config.get_derived_conversions() | |
new_participants_table = participants_table(end_time) | |
events_table = new_conversion_events_table(start_time, end_time) | |
pending_table = pending_conversions_table(start_time, end_time) | |
conversion_select_aggregators = [ | |
'IFNULL(SUM(conv_%(conv_id)s), 0) AS conv_%(conv_id)s' % { | |
'conv_id': conv.id | |
} for conv in conversions | |
] | |
derived_conversion_aggregators = [ | |
'%(aggregator)s AS conv_%(conv_id)s' % { | |
'conv_id': conv.id, | |
'aggregator': conv.aggregator, | |
} | |
for conv in derived_conversions | |
] | |
# Note: The JOIN EACH in this query might in the future fail with | |
# "Resources exceeded during query execution" as we get more | |
# participants. If that happens, we should fix this by changing the | |
# participants table to a subquery that only includes rows that are | |
# also in the logs table. This should make it small enough to be | |
# suitable as the right side of a JOIN. | |
# In particular, we should NOT switch the order of the tables, since | |
# the logs table might have many, many rows for a single bingo_id (e.g. | |
# if a bot is spamming us), and uneven key distributions on the right | |
# side of a JOIN can cause a failure. | |
query = """ | |
SELECT | |
part.experiment AS experiment, | |
part.bingo_id AS bingo_id, | |
part.alternative AS alternative, | |
MAX(logs.time) AS last_updated_time, | |
%(conversion_selects)s | |
FROM | |
%(new_conversion_events_table)s logs | |
JOIN EACH | |
%(participants_table)s part | |
ON | |
logs.bingo_id = part.bingo_id | |
WHERE logs.time >= part.participation_time | |
GROUP EACH BY | |
experiment, bingo_id, alternative | |
""" % { | |
'new_conversion_events_table': dataset + '.' + events_table, | |
'participants_table': dataset + '.' + new_participants_table, | |
# For each conversion, take the sum over a boolean column which | |
# consists of true values (treated as 1s) and nulls. Since the | |
# sum over nulls is null, we need to convert that back to 0. | |
# Note that keeping null for the conversion total would work | |
# just fine and save quite a bit of space, but we need the | |
# table to be big enough that we can be confident that the | |
# inner GROUP EACH BY in ComputeConversionCountSummaryPipeline | |
# won't fail (a bigger table helps because BigQuery allocates | |
# resources based on the size of the tables being queried). | |
'conversion_selects': ','.join( | |
conversion_select_aggregators + | |
derived_conversion_aggregators) | |
} | |
yield bq_pipelines.QueryToTablePipeline( | |
query, dataset, pending_table, result_table_ttl_days=1) | |
class BuildConversionsTablePipeline(pipeline.Pipeline): | |
"""Compute the new per-user conversion summaries.""" | |
output_names = ['cost'] | |
def run(self, dataset, start_time, end_time): | |
conversion_columns = config.get_final_conversion_columns() | |
old_conversions_table = conversions_table(start_time) | |
new_conversions_table = conversions_table(end_time) | |
pending_table = pending_conversions_table(start_time, end_time) | |
# We have to be careful with this query, since it is by far the most | |
# likely to run out of resources. One simple way to merge the tables | |
# would be to union them, group by experiment and bingo_id, and take | |
# the sum over every conversion, but this ends up using more space than | |
# necessary, since it effectively needs to build a giant hash table | |
# with all bingo_ids in both tables. Instead, we take advantage of the | |
# fact that the pending conversions table is much smaller than the old | |
# conversions table, which means that it's much less memory-intensive | |
# to make a hash table of pending conversions and do lookups in that. | |
# We accomplish this by using a LEFT OUTER JOIN to find which rows in | |
# the old conversions table are being updated and which ones are | |
# staying the same and only doing the GROUP BY logic for the ones that | |
# are being updated. | |
conversions_table_columns = [ | |
field['name'] for field in get_conversions_schema()['fields']] | |
columns_from_join = ', '.join('t1.%(col)s AS %(col)s' % {'col': col} | |
for col in conversions_table_columns) | |
untouched_old_rows_template = """ | |
SELECT %(columns_from_join)s | |
FROM %(old_conversions)s t1 | |
LEFT OUTER JOIN EACH %(pending_conversions)s t2 | |
ON t1.experiment = t2.experiment | |
AND t1.bingo_id = t2.bingo_id | |
AND t1.alternative = t2.alternative | |
WHERE t2.bingo_id IS NULL | |
""" | |
touched_old_rows_template = """ | |
SELECT %(columns_from_join)s | |
FROM %(old_conversions)s t1 | |
JOIN EACH %(pending_conversions)s t2 | |
ON t1.experiment = t2.experiment | |
AND t1.bingo_id = t2.bingo_id | |
AND t1.alternative = t2.alternative | |
""" | |
updated_and_new_rows_template = """ | |
SELECT experiment, bingo_id, alternative, | |
MAX(last_updated_time) AS last_updated_time, | |
%(conversion_selects)s | |
FROM (""" + touched_old_rows_template + """), | |
%(pending_conversions)s | |
GROUP EACH BY experiment, alternative, bingo_id | |
""" | |
# There are three cases here: | |
# 1.) A row is in old_conversions but not pending_conversions. | |
# 2.) A row is in both tables. | |
# 3.) A row is only in pending_conversions. | |
# The first case is handled by untouched_old_rows_template and the | |
# other two cases are handled by updated_and_new_rows_template. | |
query = (""" | |
SELECT experiment, bingo_id, alternative, last_updated_time, | |
%(convert_null_selects)s | |
FROM | |
(""" + untouched_old_rows_template + """), | |
(""" + updated_and_new_rows_template + """) | |
""") % { | |
'old_conversions': dataset + '.' + old_conversions_table, | |
'pending_conversions': dataset + '.' + pending_table, | |
'columns_from_join': columns_from_join, | |
'conversion_selects': ','.join( | |
['SUM(IFNULL(conv_%(column)s, 0)) AS conv_%(column)s' % { | |
'column': conv_col | |
} for conv_col in conversion_columns]), | |
'convert_null_selects': ', '.join( | |
'IF(%(conv)s = 0, null, %(conv)s) as %(conv)s' % { | |
'conv': 'conv_' + conv_col | |
} | |
for conv_col in conversion_columns) | |
} | |
yield bq_pipelines.QueryToTablePipeline( | |
query, dataset, new_conversions_table, result_table_ttl_days=7) | |
class ComputeParticipantCountSummaryPipeline(pipeline.Pipeline): | |
"""Computes the latest participant counts for each experiment. | |
The resulting counts are done on an per-alternative basis and are appended | |
to the end of the participant_snapshots table. | |
""" | |
output_names = ['cost'] | |
def run(self, dataset, end_time): | |
new_participants_table = participants_table(end_time) | |
query = """ | |
SELECT | |
experiment, | |
alternative, | |
COUNT(*) AS num_participants, | |
%(end_time)s AS snapshot_time | |
FROM %(new_participants_table)s | |
GROUP EACH BY experiment, alternative | |
""" % { | |
'end_time': end_time, | |
'new_participants_table': | |
dataset + '.' + new_participants_table | |
} | |
yield bq_pipelines.QueryToTablePipeline( | |
query, dataset, 'participant_snapshots', append_to_table=True) | |
class ComputeConversionCountSummaryPipeline(pipeline.Pipeline): | |
"""Computes the latest conversion counts for each experiment. | |
For each conversion, we separately compute the conversion as a count | |
conversion and as a binary conversion, and we output a column for each. For | |
example, the conversion foo (which has column conv_foo in the conversions | |
table) would have count_foo and binary_foo as columns in the conversion | |
snapshots table. | |
The results are appended to the end of the conversion_snapshots table. | |
""" | |
output_names = ['cost'] | |
def run(self, dataset, end_time): | |
conversion_columns = config.get_final_conversion_columns() | |
# For each conversion, we output a column for the total number of | |
# conversions across all users and a column for the number of users | |
# with a nonzero conversion count. | |
conversion_selects = ','.join( | |
['SUM(conv_%(column)s) AS count_%(column)s, ' | |
'SUM(conv_%(column)s > 0) AS binary_%(column)s' % { | |
'column': conv_col | |
} for conv_col in conversion_columns]) | |
new_conversions_table = conversions_table(end_time) | |
query = """ | |
SELECT | |
experiment, | |
alternative, | |
%(end_time)s AS snapshot_time, | |
%(conversion_selects)s | |
FROM | |
%(new_conversions_table)s | |
GROUP EACH BY experiment, alternative | |
""" % { | |
'new_conversions_table': dataset + '.' + new_conversions_table, | |
'end_time': end_time, | |
'conversion_selects': conversion_selects, | |
} | |
yield bq_pipelines.QueryToTablePipeline( | |
query, dataset, 'conversion_snapshots', append_to_table=True) | |
class PublishTablesPipeline(pipeline.Pipeline): | |
"""Copy the result tables to a common dataset. | |
The dataset containing all timestamped tables can get kind of crowded, so | |
this makes it convenient for people to look at the most up-to-date data. | |
-We copy the latest participants and conversions tables as | |
source_participants and source_conversions. | |
-We create (or update) a view for each experiment that joins and filters | |
the source tables into meaningful data. | |
-We join the participation and conversion snapshots tables into a single | |
table called historical_experiment_totals. We also remove any duplicates | |
(which can happen in rare failure cases). | |
-We extract the latest historical totals into a table called | |
experiment_totals. | |
These table names are convenient because the most useful tables are first | |
when ordered alphabetically. | |
""" | |
output_names = ['cost'] | |
def run(self, dataset, publish_dataset, end_time): | |
copy_specs = [] | |
def add_copy_spec(src_table, dest_table): | |
copy_specs.append({ | |
'src_dataset': dataset, | |
'src_table': src_table, | |
'dest_dataset': publish_dataset, | |
'dest_table': dest_table | |
}) | |
add_copy_spec(participants_table(end_time), 'source_participants') | |
add_copy_spec(conversions_table(end_time), 'source_conversions') | |
with pipeline.InOrder(): | |
yield bq_pipelines.CopyTableBatchPipeline(copy_specs) | |
views_created = yield CreatePublishViewsPipeline(publish_dataset) | |
conversion_columns = config.get_final_conversion_columns() | |
conversion_selects = ','.join( | |
['MAX(conv.count_%(column)s) AS count_%(column)s, ' | |
'MAX(conv.binary_%(column)s) AS binary_%(column)s' % { | |
'column': conv_col | |
} for conv_col in conversion_columns]) | |
extract_historical_totals_query = """ | |
SELECT | |
part.experiment AS experiment, | |
part.alternative AS alternative, | |
part.snapshot_time AS snapshot_time, | |
MAX(part.num_participants) AS num_participants, | |
%(conversion_selects)s | |
FROM | |
%(conversion_snapshots_table)s conv | |
JOIN EACH | |
%(participant_snapshots_table)s part | |
ON part.experiment = conv.experiment | |
AND part.alternative = conv.alternative | |
AND part.snapshot_time = conv.snapshot_time | |
GROUP EACH BY experiment, alternative, snapshot_time | |
""" % { | |
'participant_snapshots_table': dataset + '.participant_snapshots', | |
'conversion_snapshots_table': dataset + '.conversion_snapshots', | |
'conversion_selects': conversion_selects | |
} | |
extract_latest_totals_query = """ | |
SELECT * FROM %(historical_experiment_totals)s | |
WHERE snapshot_time = %(end_time)s | |
""" % { | |
'historical_experiment_totals': | |
publish_dataset + '.historical_experiment_totals', | |
'end_time': end_time | |
} | |
with pipeline.InOrder(): | |
query1_result = yield bq_pipelines.QueryToTablePipeline( | |
extract_historical_totals_query, publish_dataset, | |
'historical_experiment_totals') | |
query2_result = yield bq_pipelines.QueryToTablePipeline( | |
extract_latest_totals_query, publish_dataset, | |
'experiment_totals') | |
total_cost = yield common.Sum(query1_result.cost, query2_result.cost) | |
# Explicitly make ReturnOutputs block on the CopyTable pipeline above. | |
# Since ReturnOutputs is the last pipeline yielded, the pipeline API | |
# will view its completion as the completion of the whole publish | |
# pipeline, so we need to make sure it doesn't run until all of the | |
# intermediate steps are done. | |
with pipeline.After(views_created): | |
yield bq_pipelines.ReturnOutputs("Success", cost=total_cost) | |
class CreatePublishViewsPipeline(pipeline.Pipeline): | |
"""Create views in the publish dataset pointing into the raw data tables. | |
These views make it easy for people to dig into experiment data manually | |
without having to worry about the slightly-weird way that BigBingo stores | |
its data. | |
The view does three things that humans would otherwise have to do: | |
-It joins the participants and conversions tables, which is necessary since | |
the conversions table doesn't have rows for bingo_ids that have never | |
triggered any conversions. | |
-It filters out the rows to only include the experiment in question. | |
-It converts all null conversion values to 0, since nulls have weird | |
behavior (e.g. they're ignored when taking averages). | |
Note that this only actually changes anything if there was a schema change, | |
but we write every view every time since it's easy and making views is | |
cheap. | |
""" | |
def run(self, publish_dataset): | |
experiments = config.get_active_experiments() | |
# If the experiment was just archived, it will still have a view here. | |
# That view might be out of date if the schema has changed, so we | |
# should updated it, too. | |
publish_tables = set(bq_connection.get_all_tables_in_dataset( | |
publish_dataset)) | |
experiments.extend( | |
experiment for experiment in config.get_archived_experiments() | |
if raw_table(experiment.id) in publish_tables) | |
for experiment in experiments: | |
view_name = raw_table(experiment.id) | |
conversion_column_names = [ | |
'conv_' + conv_col | |
for conv_col in config.get_final_conversion_columns()] | |
# Unlike all other queries, this one is visible in the BigQuery UI, | |
# so we make an attempt at making it look nice. | |
query = """\ | |
SELECT parts.bingo_id AS bingo_id, | |
parts.alternative AS alternative, | |
parts.participation_time AS participation_time, | |
convs.last_updated_time AS last_conversion_updated_time, | |
%(conv_selects)s | |
FROM | |
(SELECT * FROM %(dataset)s.source_participants | |
WHERE experiment = "%(exp_name)s") parts | |
LEFT OUTER JOIN EACH | |
(SELECT * FROM %(dataset)s.source_conversions | |
WHERE experiment = "%(exp_name)s") convs | |
ON parts.alternative = convs.alternative | |
AND parts.bingo_id = convs.bingo_id""" % { | |
'exp_name': experiment.logged_name, | |
'dataset': publish_dataset, | |
'conv_selects': ',\n '.join( | |
'IFNULL(%(column)s, 0) AS %(column)s' % {'column': column} | |
for column in conversion_column_names | |
) | |
} | |
bq_connection.create_or_update_view(publish_dataset, view_name, | |
query) | |
class ArchiveExperimentsPipeline(pipeline.Pipeline): | |
"""Archive the given experiments if they have not already been. | |
This pipeline moves the summary and raw data for the experiment into the | |
archive dataset, and removes it from the live datasets. | |
If an experiment has already been archived, it | |
ignores the experiment, so this pipeline should be idempotent; if no | |
experiments have been archived since the last time it was run, it will do | |
nothing. | |
-We copy the view raw_<experiment>_data to a real table in the archive. | |
-We copy all snapshots from historical_experiment_totals to | |
historical_<experiment>_totals in the archive, | |
-We remove all data from the experiment from: | |
-the view raw_<experiment>_data (removing the view since it will be | |
empty) | |
-participant_snapshots and conversion_snapshots | |
-participants_<timestamp> and conversions_<timestamp> | |
""" | |
output_names = ['cost', 'num_archived'] | |
def run(self, dataset, publish_dataset, archive_dataset, timestamp, | |
experiment_ids): | |
experiments = [config.get_possibly_archived_experiment(exp_id) | |
for exp_id in experiment_ids] | |
archive_tables = set(bq_connection.get_all_tables_in_dataset( | |
archive_dataset)) | |
publish_tables = set(bq_connection.get_all_tables_in_dataset( | |
publish_dataset)) | |
# Since the totals table is created after the raw table, we can | |
# assume that the totals schema exists if and only if the | |
# experiment has been successfully archived. (Here | |
# successfully does not necessarily include deleting the data, | |
# but if that doesn't happen we don't worry about it.) We also want to | |
# check that the view in the publish dataset exists; if not, we won't | |
# be able to archive the experiment. (If the view exists but the | |
# experiment has no data in the other tables, everything will work as | |
# expected and make empty tables.) This should really never happen | |
# unless no summarize jobs run between the deploy where an experiment | |
# is created and the deploy where it is archived. | |
unarchived_experiments = [ | |
experiment for experiment in experiments | |
if totals_table(experiment.id) not in archive_tables | |
and raw_table(experiment.id) in publish_tables] | |
if not unarchived_experiments: | |
yield bq_pipelines.ReturnOutputs("Success", num_archived=0, cost=0) | |
else: | |
archive_futures = [] | |
for experiment in unarchived_experiments: | |
archive_result = yield ArchiveSingleExperimentPipeline( | |
publish_dataset, archive_dataset, experiment.id, | |
experiment.logged_name) | |
archive_futures.append(archive_result) | |
with pipeline.After(*archive_futures): | |
# To save on queries, we remove all the data from the main | |
# dataset at once. This must happen after the data we want | |
# gets archived, of course. Needless to say, we want to be | |
# extra-careful with this part. | |
experiment_ids = [exp.id for exp in unarchived_experiments] | |
remove_result = yield RemoveExperimentsPipeline( | |
dataset, publish_dataset, timestamp, experiment_ids) | |
total_cost = yield common.Sum( | |
remove_result.cost, *[p.cost for p in archive_futures]) | |
yield bq_pipelines.ReturnOutputs( | |
"Success", num_archived=len(archive_futures), cost=total_cost) | |
class ArchiveSingleExperimentPipeline(pipeline.Pipeline): | |
"""Copies the tables to archive a single experiment. | |
This pipeline does not remove anything from the main dataset, so it should | |
be idempotent. It will be called from ArchiveExperimentsPipeline, which | |
gives more detail as to exactly which tables get copied. | |
""" | |
output_names = ['cost'] | |
def run(self, publish_dataset, archive_dataset, experiment_id, | |
experiment_name): | |
with pipeline.InOrder(): | |
# Do this in order so that the existence of the totals table | |
# guarantees that everything worked. | |
dump_query = """ | |
SELECT * | |
FROM %(publish_dataset)s.%(raw_table)s | |
""" % { | |
'publish_dataset': publish_dataset, | |
'raw_table': raw_table(experiment_id), | |
} | |
raw_dump = yield bq_pipelines.QueryToTablePipeline( | |
dump_query, archive_dataset, raw_table(experiment_id)) | |
totals_query = """ | |
SELECT * | |
FROM %(publish_dataset)s.historical_experiment_totals | |
WHERE experiment = "%(experiment)s" | |
""" % { | |
'publish_dataset': publish_dataset, | |
'experiment': experiment_name | |
} | |
totals_dump = yield bq_pipelines.QueryToTablePipeline( | |
totals_query, archive_dataset, totals_table(experiment_id)) | |
total_cost = yield common.Sum(raw_dump.cost, totals_dump.cost) | |
yield bq_pipelines.ReturnOutputs("Success", cost=total_cost) | |
class RemoveExperimentsPipeline(pipeline.Pipeline): | |
"""Removes the given experiments from the main dataset. | |
In particular, removes the views publish_dataset.raw_<experiment>_data, and | |
removes all of the experiments from participant_snapshots, | |
conversion_snapshots, participants_<timestamp>, and | |
conversions_<timestamp>. They may still persist in the publish dataset | |
until the next summarize task run, but that's fine. | |
Timestamp should be a Unix timestamp in seconds. | |
Needless to say, be careful with this; it deletes data, without directly | |
checking if that data has been archived elsewhere. | |
TODO(benkraft): There is a potential race condition here: if this is | |
running at the same time as a summarize task, the summarize task might be | |
generating the next participants_<timestamp> and conversions_<timestamp> | |
while we're busy removing this data from the old one. This would cause the | |
data to not get deleted from the main dataset. This isn't a big problem, | |
but we might want to run something to garbage-collect these occasionally. | |
Another potential race condition could happen if the snapshots tables are | |
updated by the summarize task while this task is running. This could cause | |
the newly summarized data to get clobbered by the archiver, since this | |
pipeline is not atomic. | |
""" | |
output_names = ['cost'] | |
def run(self, dataset, publish_dataset, timestamp, experiment_ids): | |
views_to_delete = [raw_table(e) for e in experiment_ids] | |
experiment_names = [ | |
config.get_possibly_archived_experiment(e).logged_name | |
for e in experiment_ids] | |
yield bq_pipelines.EnsureTablesDeletedPipeline( | |
publish_dataset, views_to_delete) | |
delete_part_snapshots = yield RemoveExperimentsFromTablePipeline( | |
dataset, 'participant_snapshots', experiment_names) | |
delete_conv_snapshots = yield RemoveExperimentsFromTablePipeline( | |
dataset, 'conversion_snapshots', experiment_names) | |
delete_part_timestamp = yield RemoveExperimentsFromTablePipeline( | |
dataset, participants_table(timestamp), experiment_names, | |
result_table_ttl_days=7) | |
delete_conv_timestamp = yield RemoveExperimentsFromTablePipeline( | |
dataset, conversions_table(timestamp), experiment_names, | |
result_table_ttl_days=7) | |
total_cost = yield common.Sum( | |
delete_part_snapshots.cost, delete_conv_snapshots.cost, | |
delete_part_timestamp.cost, delete_conv_timestamp.cost) | |
yield bq_pipelines.ReturnOutputs("Success", cost=total_cost) | |
class RemoveExperimentsFromTablePipeline(pipeline.Pipeline): | |
"""Removes some experiments from a table. | |
Queries the table for all except the given experiments, saving the result | |
in a temporary table, and then clobbering the original table with that | |
temporary table. See the warning about potential race conditions in | |
RemoveExperimentsPipeline. | |
Since this operation overwrites the table, the expiration time may need to | |
be reset, in which case result_table_ttl_days should be specified. | |
""" | |
output_names = ['cost'] | |
def run(self, dataset, table_name, experiment_names, | |
result_table_ttl_days=None): | |
tmp_table_name = table_name + '_tmp' | |
query = """ | |
SELECT * | |
FROM %(dataset)s.%(table_name)s | |
WHERE experiment NOT IN("%(experiments)s") | |
""" % { | |
'dataset': dataset, | |
'table_name': table_name, | |
'experiments': '", "'.join(experiment_names) | |
} | |
with pipeline.InOrder(): | |
clone = yield bq_pipelines.QueryToTablePipeline(query, dataset, | |
tmp_table_name) | |
yield bq_pipelines.CopyTablePipeline(dataset, tmp_table_name, | |
dataset, table_name, | |
result_table_ttl_days=result_table_ttl_days) | |
yield bq_pipelines.EnsureTablesDeletedPipeline(dataset, | |
[tmp_table_name]) | |
yield bq_pipelines.ReturnOutputs("Success", cost=clone.cost) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Super neato. Thanks! I found my way here from your blog post. Any updates on this code since 2014, or better yet plans to turn this into a reusable open source project like GAEBingo was?