"""Useful pipeline classes for interacting with BigQuery."""
import json
import logging
import os
import time

from third_party.google_api_python_client.apiclient import (
    errors as apiclient_errors,
    http as apiclient_http
)
from third_party.mapreduce.lib.pipeline import pipeline
from third_party.mapreduce.lib.pipeline import common
from bigquery import bq_connection


# In some situations, we want to submit lots of API requests at once. Batching
# helps with this, but BigQuery will give an error if we send way too many API
# requests in the same batch, so we limit the number of requests we're willing
# to send at once.
DEFAULT_NUM_REQUESTS_PER_BATCH = 10


# When hitting retriable errors (rate limit exceeded or internal BigQuery
# errors) when submitting HTTP requests, automatically retry up to this many
# times.
#
# Note that automatic pipeline retry accomplishes some of this, but this HTTP
# retry works better because only failed requests in a batch are retried rather
# than retrying the whole batch. Also, since we limit retrying to definitely-
# retriable errors, it's ok to be more aggressive about retrying.
NUM_HTTP_RETRY_ATTEMPTS = 10


# Sets a limit on the amount of time we're willing to wait before giving up on
# a query. This setting is necessary because sometimes, BigQuery will spend a
# very long time on a small number of queries. For example, there have been
# times where 50 queries are submitted at the same time and 48 of those queries
# finish within a few minutes, but the remaining two take multiple hours to
# finish. In these cases, simply submitting another copy of the two stalled
# queries would work, so we treat the time limit as a retriable error.
# TODO(alan): This time limit used to be 15 minutes, but I disabled the feature
# by setting it to None to work around a correctness issue in BigQuery that
# happens when you submit the same query twice:
# http://stackoverflow.com/questions/23424289/what-atomicity-guarantees-does-bigquery-provide-for-query-jobs
# If we keep running into issues, it may make sense to set this time limit
# again (potentially to a number higher than 15 minutes), and if we stop
# running into the issue, we may want to remove this feature completely.
QUERY_TIME_LIMIT_SECONDS = None


# Initial amount of time, in seconds, to sleep when waiting for a job to finish
# (assuming it's not done immediately).
JOB_WAIT_INITIAL_TIME = 5


# Increase the sleeping time by the following factor when waiting for a job to
# finish. Back off exponentially so the task hierarchy doesn't get ridiculously
# deep in the pipeline viewer when we wait on a long-running job like a batch
# query.
JOB_WAIT_BACKOFF_MULTIPLIER = 1.5


DEFAULT_QUERY_PRIORITY = os.environ.get('DEFAULT_BIGQUERY_PRIORITY', 'BATCH')


class ReturnOutputs(pipeline.Pipeline):
    """General-purpose mechanism for returning named outputs.

    This makes it more convenient to return pipeline results as outputs, since
    the normal self.fill mechanism requires returning already-resolved values,
    not pipeline futures. To use this, this pipeline should be yielded last.
    Also, any outputs specified will be ignored unless they are specified in
    the output_names list of the calling pipeline.

    For example, this code snippet lets you combine the results from two child
    pipelines into three outputs (one default and two named):
    default_and_foo = yield ComputeDefaultAndFoo()
    bar = yield ComputeBar()
    yield ReturnOutputs(default_and_foo, foo=default_and_foo.foo, bar=bar)

    Note that this pipeline is unnecessary if the last pipeline yielded already
    returns outputs with the desired names and values.
    """
    def run(self, default_output, **kwargs):
        for key in kwargs:
            self.fill(key, kwargs[key])
        return default_output


class ListIndex(pipeline.Pipeline):
    """Returns the element at the given index from the given list."""
    def run(self, input_list, index):
        return input_list[index]


class SmallResultQueryPipeline(pipeline.Pipeline):
    """Pipeline for running a query and getting its results.

    This pipeline is convenient if the results are small (e.g. if you're
    querying an aggregate) and you want to view the results directly in Python
    rather than sending the results to a table.

    Arguments:
        query: A string with the text of the query to run.

    Returns:
        The query results as returned in the getQueryResults method in the jobs
        section of the BigQuery API. For example,
        results['rows'][0]['f'][0]['v'] gets the first field of the first row.
    """
    def run(self, query):
        job_metadata_list = yield RunJobBatchPipeline([{
            'query': {
                'query': query
            }
        }])
        job_metadata = yield ListIndex(job_metadata_list, 0)
        yield GetQueryResultsPipeline(job_metadata)


class GetQueryResultsPipeline(pipeline.Pipeline):
    """Fetches and returns the actual query results for a job.

    Arguments:
        job_metadata: A dict containing information about the job to get query
            results for, as returned by insert or get in the jobs section of
            the BigQuery API.

    Returns:
        The query results as returned in the getQueryResults method in the jobs
        section of the BigQuery API. For example,
        results['rows'][0]['f'][0]['v'] gets the first field of the first row.
    """
    def run(self, job_metadata):
        job_service = bq_connection.BigQueryService.get_service().jobs()
        return job_service.getQueryResults(
            projectId=job_metadata['jobReference']['projectId'],
            jobId=job_metadata['jobReference']['jobId']).execute()


class QueryToTablePipeline(pipeline.Pipeline):
    """Pipeline for running a query and sending the output to a table.

    This pipeline will submit a query and will not mark itself as done until
    the query has either finished or failed.

    Arguments:
        query: A string with the text of the query to run.
        output_dataset: The dataset of the output table.
        output_table: The name of the output table.
        create_if_needed: If true (the default), we create the table (rather
            than failing) if it doesn't already exist.
        append_to_table: If false (the default), we overwrite the table
            contents rather than appending to the end of the table.
        priority: A string, either 'INTERACTIVE' or 'BATCH'. In both cases, the
            query will be submitted asynchronously and the pipeline will wait
            until the query completes, but interactive queries generally run
            faster and are subject to rate limits, while batch queries are
            marked as lower priority in BigQuery's scheduler and have no
            restriction on the number of concurrent queries. The default
            priority is BATCH since pipelines are most commonly run in
            background jobs where stability and isolation (i.e. not affecting
            humans using the BigQuery webapp) are more important than low
            latency.
        allow_large_results: Setting to True (the default) may slow the query
            down, but allows the query to return more than 128MB of results.
        result_table_ttl_days: If specified, the resulting table is
            asynchronously set to expire after the given number of days.
        table_desc: A string with a human-readable description of this BigQuery
            table. If this is None, we do not set the table description.
        schema_desc: A list of dictionaries, where each dictionary is of the
            form as described in [1] in the patch/body/schema/fields section.
            If this is None, we do not set the schema descriptions.

    Returns:
        default: Metadata describing the status of the query job (the return
            value of the "get" function in the "jobs" section of the BigQuery
            API).
        cost: The number of cents spent on the query.

    [1] https://developers.google.com/resources/api-libraries/documentation/
        bigquery/v2/python/latest/bigquery_v2.tables.html
    """
    output_names = ['cost']

    def run(self, query, output_dataset, output_table, create_if_needed=True,
            append_to_table=False, priority=DEFAULT_QUERY_PRIORITY,
            allow_large_results=True, result_table_ttl_days=None,
            table_desc=None, schema_desc=None):
        job_results = yield RunJobBatchPipeline(
            [query_to_table_job_config(query, output_dataset, output_table,
                                       create_if_needed, append_to_table,
                                       priority, allow_large_results)],
            time_limit=QUERY_TIME_LIMIT_SECONDS)
        if result_table_ttl_days is not None:
            with pipeline.After(job_results):
                yield SetTableTTLInDaysPipeline(
                    output_dataset, output_table, result_table_ttl_days)
        if table_desc is not None or schema_desc is not None:
            with pipeline.After(job_results):
                yield UpdateTableInfoPipeline(
                    output_dataset, output_table, table_desc, schema_desc)
        cost = yield GetCostPipeline(job_results)
        job_result = yield ListIndex(job_results, 0)
        yield ReturnOutputs(job_result, cost=cost)


class QueryToTableBatchPipeline(pipeline.Pipeline):
    """Batched version of QueryToTablePipeline.

    This pipeline lets you submit many queries at the same time and block until
    all of them have finished. The API requests are batched, so this version is
    more performant, and the requests are automatically throttled as necessary
    to avoid hitting BigQuery's request limit.

    Arguments:
        query_specs: A list of dictionaries, each of which describes a query
            to make and the table that it should output to. The dictionary keys
            are the same as the arguments to QueryToTablePipeline, although
            result_table_ttl_days is not yet supported for this pipeline.

    Returns:
        default: A list of job results, each of which has the same format as
            the return value of bq_service.jobs().get(...).
        cost: The combined cost of all queries.
    """
    output_names = ['cost']

    def run(self, query_specs):
        job_configs = [query_to_table_job_config(**spec)
                       for spec in query_specs]
        results = yield RunJobBatchPipeline(
            job_configs, time_limit=QUERY_TIME_LIMIT_SECONDS)
        cost = yield GetCostPipeline(results)
        yield ReturnOutputs(results, cost=cost)


def query_to_table_job_config(
        query, output_dataset, output_table, create_if_needed=True,
        append_to_table=False, priority=DEFAULT_QUERY_PRIORITY,
        allow_large_results=True):
    """Gets the 'configuration' dict to use when starting a BigQuery query.

    See the documentation for bq_service.jobs().insert(...) for more
    information.
    """
    logging.info('Submitting query %s' % query)
    create_mode = 'CREATE_IF_NEEDED' if create_if_needed else 'CREATE_NEVER'
    # Note that the API also allows for a WRITE_EMPTY option where we
    # expect the table to be empty, and there's no reason we can't add
    # support for that here if we need to.
    write_mode = 'WRITE_APPEND' if append_to_table else 'WRITE_TRUNCATE'
    return {
        'query': {
            'query': query,
            'priority': priority,
            'destinationTable': {
                'projectId': bq_connection.BQ_PROJECT_ID,
                'datasetId': output_dataset,
                'tableId': output_table
            },
            'createDisposition': create_mode,
            'writeDisposition': write_mode,
            'allowLargeResults': allow_large_results,
        }
    }


class GetCostPipeline(pipeline.Pipeline):
    """Given a list of query metadata, compute the total cost of all queries.

    Arguments:
        job_infos: A list of dicts, each containing information about a query
        job. The format is the return format for bq_service.jobs().get(...), as
        documented in the BigQuery API (see the documentation on
        BigQueryService for a link).

    Returns: The combined cost of all queries, in cents.
    """
    def run(self, job_infos):
        result = 0
        for job_info in job_infos:
            num_bytes = int(job_info['statistics']['query']
                            ['totalBytesProcessed'])
            # Queries cost 0.5 cents per GB of data touched.
            result += 0.5 * num_bytes / (2 ** 30)
        return result


class CopyTablePipeline(pipeline.Pipeline):
    """Copies the contents of the source table to the destination table.

    Both tables are specified by their dataset name and table name. If
    append_to_table is False and the destination table already exists, it is
    overwritten with the contents of the source table.

    This pipeline is not marked as finished until the copy job is complete.
    """
    def run(self, src_dataset, src_table, dest_dataset, dest_table,
            append_to_table=False, result_table_ttl_days=None):
        result_list = yield RunJobBatchPipeline([copy_table_job_config(
            src_dataset, src_table, dest_dataset, dest_table, append_to_table)
        ])
        if result_table_ttl_days is not None:
            with pipeline.After(result_list):
                yield SetTableTTLInDaysPipeline(
                    dest_dataset, dest_table, result_table_ttl_days)
        yield ListIndex(result_list, 0)


class CopyTableBatchPipeline(pipeline.Pipeline):
    """Performs multiple copy operations in a batch.

    Arguments:
        copy_specs: A list of dictionaries, each of which describes a copy
            operation. The dict keys should be the same as the arguments to
            CopyTablePipeline.
    """
    def run(self, copy_specs):
        yield RunJobBatchPipeline(
            [copy_table_job_config(**copy_spec) for copy_spec in copy_specs])


def copy_table_job_config(src_dataset, src_table, dest_dataset, dest_table,
                          append_to_table=False):
    write_mode = 'WRITE_APPEND' if append_to_table else 'WRITE_TRUNCATE'
    return {
        'copy': {
            'sourceTable': {
                'projectId': bq_connection.BQ_PROJECT_ID,
                'datasetId': src_dataset,
                'tableId': src_table
            },
            'destinationTable': {
                'projectId': bq_connection.BQ_PROJECT_ID,
                'datasetId': dest_dataset,
                'tableId': dest_table
            },
            'createDisposition': 'CREATE_IF_NEEDED',
            'writeDisposition': write_mode
        }
    }


class UpdateTableInfoPipeline(pipeline.Pipeline):
    """Updates the descriptions of an existing BigQuery table.

    This pipeline assumes that the table already exists and the schema are as
    specified. The entire purpose of this class as is is to add a description
    for the table itself and/or for the schema in the table.

    For an example of how to include descriptions when creating a table, please
    refer to bigquery_reports/user_sessions.py.

    TODO(ilan): It would be ideal to automatically generate the schema for our
    existing tables based on the docstrings we've already written.
    """
    def run(self, dataset_name, table_name, table_desc=None, schema_desc=None):
        """Updates the descriptions for the BigQuery table and its schema.

        Args:
            dataset_name: A string with the ID of the dataset where this table
                          resides.
            table_name:   A string with the ID of this table.
            table_desc:   A string with a human-readable description of this
                          BigQuery table. If this is None, we do not set the
                          table description.
            schema_desc:  A list of dictionaries, where each dictionary is of
                          the form as described in [1] in the patch/body/
                          schema/fields section. If this is None, we do not set
                          the schema descriptions.

        [1] https://developers.google.com/resources/api-libraries/
            documentation/bigquery/v2/python/latest/bigquery_v2.tables.html
        """
        table_service = bq_connection.BigQueryService.get_service().tables()

        # I'm really not sure what to put here, so I'm just copying from
        # EnsureTablesDeletedPipeline for now
        def handle_result(request_id, response, exception):
            # Ignore missing tables
            if exception and not (
                    isinstance(exception, apiclient_errors.HttpError) and
                    json.loads(exception.content)['error']['code'] == 404):
                raise exception

        run_http_requests(
            [table_service.patch(
                projectId=bq_connection.BQ_PROJECT_ID,
                datasetId=dataset_name,
                tableId=table_name,
                body=patch_table_job_config(dataset_name, table_name,
                                            table_desc, schema_desc))],
            handle_result)


def patch_table_job_config(dataset_name, table_name, table_desc=None,
                           schema_desc=None):
    """Returns the job config dictionary for this table patch method call.

    Args:
        dataset_name: A string with the ID of the dataset where this table
                      resides.
        table_name:   A string with the ID of this table.
        table_desc:   A string with a human-readable description of this
                      BigQuery table. If this is None, we do not set the
                      table description.
        schema_desc:  A list of dictionaries, where each dictionary is of
                      the form as described in [1] in the patch/body/
                      schema/fields section. If this is None, we do not set
                      the schema descriptions.

    [1] https://developers.google.com/resources/api-libraries/
        documentation/bigquery/v2/python/latest/bigquery_v2.tables.html

    Return:
        A dictionary of the form required to run the patch job.
    """
    result = {
        'tableReference': {
            'projectId': bq_connection.BQ_PROJECT_ID,
            'tableId': table_name,
            'datasetId': dataset_name,
        },
    }

    if table_desc is not None:
        result['description'] = table_desc

    if schema_desc is not None:
        result['schema'] = {
            'fields': schema_desc,
        }

    return result


class LoadTablePipeline(pipeline.Pipeline):
    """Loads a table from Cloud Storage.

    If the specified table does not already exist, it is created. This pipeline
    is not marked as finished until the load job is complete, and the pipeline
    is aborted if there is a job failure.

    Arguments:
        cloud_storage_paths: A list of strings specifying the cloud storage
            files to load, e.g. "gs://foo/bar".
        dataset: The dataset of the table to write to.
        table: The name of the table to write to.
        schema: The schema of the table. If the table already exists, this must
            be a subset of the existing schema. See the documentation for
            bq_service.jobs().insert(...) (the BigQueryService documentation
            has a link) for a description of the schema format.
        source_format: A string, either NEWLINE_DELIMITED_JSON or CSV.
        write_mode: A string, either WRITE_APPEND or WRITE_TRUNCATE.
        result_table_ttl_days: If specified, the resulting table is
            asynchronously set to expire after the given number of days.

    Returns:
        Metadata about the job result, in the format of the return value of
        bq_service.jobs().get(...).
    """
    def run(self, cloud_storage_paths, dataset, table, schema, source_format,
            write_mode, result_table_ttl_days=None):
        result_list = yield RunJobBatchPipeline([{
            'load': {
                'sourceFormat': source_format,
                'sourceUris': cloud_storage_paths,
                'schema': schema,
                'destinationTable': {
                    'projectId': bq_connection.BQ_PROJECT_ID,
                    'datasetId': dataset,
                    'tableId': table,
                },
                'createDisposition': 'CREATE_IF_NEEDED',
                'writeDisposition': write_mode,
                'encoding': 'UTF-8',
            }
        }])
        if result_table_ttl_days is not None:
            with pipeline.After(result_list):
                yield SetTableTTLInDaysPipeline(
                    dataset, table, result_table_ttl_days)
        yield ListIndex(result_list, 0)


class EnsureTablesDeletedPipeline(pipeline.Pipeline):
    """Deletes the tables with the given names in the given dataset.

    Any tables that already don't exist are ignored. The delete operation
    happens synchronously, so the tables will be deleted when the pipeline is
    marked as finished.
    """
    def run(self, dataset_name, table_names):
        table_service = bq_connection.BigQueryService.get_service().tables()

        def handle_result(request_id, response, exception):
            # Ignore missing tables
            if exception and not (
                    isinstance(exception, apiclient_errors.HttpError) and
                    json.loads(exception.content)['error']['code'] == 404):
                raise exception

        run_http_requests(
            [table_service.delete(
                projectId=bq_connection.BQ_PROJECT_ID,
                datasetId=dataset_name,
                tableId=table_name)
             for table_name in table_names],
            handle_result)


class SetTableTTLInDaysPipeline(pipeline.Pipeline):
    """Set the table to expire after the given number of days.

    See bq_connection.set_table_ttl_in_days for more info.
    """
    def run(self, dataset, table, num_days):
        bq_connection.set_table_ttl_in_days(dataset, table, num_days)


def should_retry_job(job_info):
    """Return True if the given job failed and is eligible for retry.

    Jobs should be retried if they failed due to BigQuery internal errors.
    BigQuery job statuses provide a 'reason' error code that helps distinguish
    between transient BigQuery errors and bugs in our code or other errors that
    are not worth retrying.

    Arguments:
        job_info: The result type of bq_service.jobs().get(...), which provides
            information about the status and other details of a job.
    """
    return ('errorResult' in job_info['status'] and
            job_info['status']['errorResult']['reason'] in
                ('internalError', 'backendError', 'quotaExceeded',
                 'ka:timeLimitExceeded'))


class RunJobBatchPipeline(pipeline.Pipeline):
    """Pipeline to run a list of BigQuery job and block on their completion.

    Arguments:
        configs: A list of job configurations. The format is defined in the
            "configuration" section of the documentation for
            bq_service.jobs().insert(...).
        time_limit: The number of seconds we're willing to wait before
            considering a job stalled and trying again. Or None if we should
            wait indefinitely. If specified, the jobs passed in should all be
            idempotent.
        retries_remaining: The number of times we're willing to retry after
            this time. This is mostly used internally.

    Returns:
        A list of metadata dicts, each of which has the format of the return
        value of bq_service.jobs().get(...).
    """
    def run(self, configs, time_limit=None, retries_remaining=2):
        job_service = bq_connection.BigQueryService.get_service().jobs()

        job_ids = [None] * len(configs)

        def handler(index, response):
            job_ids[index] = response['jobReference']['jobId']

        run_http_job_requests(
            [job_service.insert(
                projectId=bq_connection.BQ_PROJECT_ID,
                body={
                    'projectId': bq_connection.BQ_PROJECT_ID,
                    'configuration': config
                })
             for config in configs],
            handler,
            # Setting this to be higher than 1 causes things to break. Looks
            # like a BigQuery bug.
            num_requests_per_batch=1)

        job_results = yield WaitForJobBatchPipeline(
            job_ids, wait_time_seconds=JOB_WAIT_INITIAL_TIME,
            time_remaining_seconds=time_limit)
        yield ConsiderJobRetry(job_results, time_limit, retries_remaining)


class ConsiderJobRetry(pipeline.Pipeline):
    """If any jobs failed and should be retried, retry them.

    If any jobs actually are retried successfully, we patch the given
    job_results list so that the new successful results replace the old failed
    results.

    TODO(alan): In the quotaExceeded case, we can be smarter about our response
    rather than just blindly retrying with a cap on the number of times. I
    think these semantics make the most sense:
    -If we're not making progress (every job had quotaExceeded), add in a delay
        of 30 seconds or so before retrying to let the system calm down.
    -If we're making progress (some jobs were successful, while others had
        quotaExceeded), don't decrement the retry count (or maybe reset it) and
        retry immediately.
    This should make it possible to throw hundreds of queries at
    RunJobBatchPipeline and have BigQuery process them as fast as it can
    without ever failing, while still protecting against the case where an
    individual query fails due to too many queries from other jobs.

    Arguments:
        job_results: A list of job result dicts (the return value of
            bq_service.jobs().get(...)).
        time_limit: The job time limit to send to RunJobBatchPipeline.
        retries_remaining: The number of times we're willing to retry.

    Returns:
        The job_results list, which may be updated with newly-retried job info.
    """
    def run(self, job_results, time_limit, retries_remaining):
        # List of (index, config) pairs. We need to track the original indexes
        # so we can replace the right ones when we finish retrying.
        retry_configs = []

        for index, job_result in enumerate(job_results):
            if should_retry_job(job_result):
                logging.info('The following failure occurred, and will be '
                             'considered for retry: %s' % job_result)
                retry_configs.append((index, job_result['configuration']))
        if retry_configs:
            if retries_remaining == 0:
                raise bq_connection.BigQueryError(
                    'The following jobs failed and were retried too many '
                    'times: %s' % [config for _, config in retry_configs])
            new_job_results = yield RunJobBatchPipeline(
                [config for _, config in retry_configs], time_limit,
                retries_remaining - 1)
            yield UpdateJobResultsList(job_results, new_job_results,
                                       [index for index, _ in retry_configs])
        else:
            yield common.Return(job_results)


class UpdateJobResultsList(pipeline.Pipeline):
    """Update some entries in a list of job results with new results.

    This is used after retrying some failed jobs to combine their new
    successful result values with the values of the jobs that succeeded
    originally.

    Arguments:
        original_job_results: A list of job results for the entire batch, some
            of which need to be replaced.
        new_job_results: A list of job results to replace the original results.
        replacement_indices: A list of indices. The nth index is the place in
            original_job_results to put new_job_results[n].

    Returns:
        A list based on original_job_results, but with some replacements made,
        as described by new_job_results and replacement_indices.
    """
    def run(self, original_job_results, new_job_results, replacement_indices):
        for new_job_result, index in zip(new_job_results, replacement_indices):
            original_job_results[index] = new_job_result
        return original_job_results


class WaitForJobBatchPipeline(pipeline.Pipeline):
    """Pipeline which waits for a list of BigQuery jobs to finish.

    Arguments:
        job_ids: A list of BigQuery job IDs to wait on.
        wait_time_seconds: The initial amount of time to wait for the job to
            finish. This is used to implement exponential backoff (the pipeline
            will always wait indefinitely for the jobs to finish).
        time_remaining_seconds: Either an integer number of seconds or None. If
            not None, this pipeline will stop waiting after that many seconds
            have passed and any jobs that are still not done will have their
            statuses modified so that they're marked as DONE with a
            timeLimitExceeded error (we use the same error format that BigQuery
            uses since error-handling code expects that format).

    Returns:
        A list of metadata dicts describing the jobs' results, as returned by
        bq_service.jobs().get(...). See the docstring on BigQueryService for a
        link to the docs. Since we retry until the jobs are finished, this
        metadata will always show the jobs as finished.
    """
    def run(self, job_ids, wait_time_seconds, time_remaining_seconds):
        job_service = bq_connection.BigQueryService.get_service().jobs()

        # This list starts out with all Nones and gets filled in.
        completed_results = [None] * len(job_ids)

        remaining_job_results = []
        remaining_job_indices = []

        def handle_result(index, response):
            if response['status']['state'] in ('PENDING', 'RUNNING'):
                remaining_job_results.append(response)
                remaining_job_indices.append(index)
            else:
                completed_results[index] = response

        run_http_job_requests(
            [job_service.get(projectId=bq_connection.BQ_PROJECT_ID,
                             jobId=job_id)
             for job_id in job_ids],
            handle_result)

        # Compute other_results, an ordered list of job results for jobs that
        # we wait for recursively.
        if remaining_job_results:
            if (time_remaining_seconds is not None and
                    time_remaining_seconds <= 0):
                # If we're out of time, pretend the job finished with an error
                # so we retry it. In practice, these long-running jobs can go
                # for hours before finishing, so it's generally best to give up
                # and try again.
                for job in remaining_job_results:
                    job['status']['state'] = 'DONE'
                    job['status']['errorResult'] = {
                        'reason': 'ka:timeLimitExceeded'
                    }
                other_results = remaining_job_results
            else:
                # Otherwise, keep waiting.
                remaining_job_ids = [job['jobReference']['jobId']
                                     for job in remaining_job_results]
                with pipeline.InOrder():
                    yield common.Delay(seconds=wait_time_seconds)
                    if time_remaining_seconds is not None:
                        time_remaining_seconds -= wait_time_seconds
                    other_results = yield WaitForJobBatchPipeline(
                        remaining_job_ids,
                        wait_time_seconds * JOB_WAIT_BACKOFF_MULTIPLIER,
                        time_remaining_seconds)
        else:
            other_results = []

        yield UpdateJobResultsList(completed_results, other_results,
                                   remaining_job_indices)


def run_http_job_requests(
        requests, callback,
        num_requests_per_batch=DEFAULT_NUM_REQUESTS_PER_BATCH):
    """Runs the given HTTP requests relating to BigQuery jobs.

    If any HTTP request fails or any request returns a job failure, an
    exception is thrown.

    Arguments:
        requests: A list of HttpRequest objects.
        callback: A function taking an index and response parameter. It is
            called for each request that is returned. The "index" parameter is
            the index of the http request in the requests list, and the
            response is the response to the HTTP request.
        num_requests_per_batch: The number of requests we're we're willing to
            send in the same request batch.

    Unlike the more general batch call provided by BatchHttpRequest, the given
    callback should only have a single parameter for the response.
    """
    def new_callback(request_id, response, exception):
        if exception:
            raise exception
        # If the job failed and is eligible for retry, return the failure
        # normally instead of failing now so that the parent pipeline has a
        # chance to handle the failure.
        if ('errorResult' in response['status'] and
                not should_retry_job(response)):
            raise bq_connection.BigQueryError(
                'Job failed: %s' % response['status']['errorResult'])
        index = int(request_id)
        callback(index, response)
    run_http_requests(requests, new_callback, num_requests_per_batch)


def run_http_requests(
        requests, callback,
        num_requests_per_batch=DEFAULT_NUM_REQUESTS_PER_BATCH):
    """Runs all HttpRequests as a batch with the same callback.

    If BigQuery sends back errors because the request rate limit is exceeded,
    this function attempts to back off and retry those failures later, although
    it eventually will give up.

    Arguments:
        requests: A list of HttpRequest objects (such as those returned by
            function calls on bq_service).
        callback: A callback to run on each result. See the documentation on
            BatchHttpRequest for more details. The request_ids provided to the
            callback will be the string value of the index into the requests
            list, but the order in which the callbacks are called is not
            guaranteed to be the same as the order of the requests.
        num_requests_per_batch: The number of requests we're willing to send in
            the same request batch. This throttling exists to avoid BigQuery
            errors from sending too many requests at once, and to work around
            a bug where BigQuery will only let you insert one job at a time.
    """
    request_ids = [str(num) for num in xrange(len(requests))]
    for id_and_request_batch in partition(zip(request_ids, requests),
                                          num_requests_per_batch):
        requests_by_id = dict(id_and_request_batch)
        run_http_requests_with_retry(
            requests_by_id, callback,
            retries_remaining=NUM_HTTP_RETRY_ATTEMPTS - 1)


def run_http_requests_with_retry(requests_by_id, callback, retries_remaining):
    """Run the given requests as a batch, retrying as necessary.

    If we hit any rate limiting errors or internal BigQuery errors, we sleep
    for half a second and try again with the failed requests. If the retry
    limit is exceeded, it is reported to the callback in the same way that
    other exceptions are reported.

    Arguments:
        requests_by_id: A dictionary mapping string request ID to the request
            itself.
        callback: A callback to handle the requests, as described in the docs
            for BatchHttpRequest.
        retries_remaining: The number of times we're willing to retry again
            before giving up.
    """
    ids_needing_retry = []

    def retry_collecting_callback(request_id, response, exception):
        if should_retry_http_exception(exception) and retries_remaining > 0:
            ids_needing_retry.append(request_id)
        else:
            callback(request_id, response, exception)

    http_batch = apiclient_http.BatchHttpRequest()
    for request_id, request in requests_by_id.iteritems():
        http_batch.add(request, callback=retry_collecting_callback,
                       request_id=request_id)
    http_batch.execute()

    # If we get a retriable exception (which happens in the case of rate
    # limiting and internal BigQuery errors), back off a bit to let the system
    # settle down, then retry.
    if ids_needing_retry:
        logging.warning('After submitting %s requests at once, %s needed to '
                        'be retried.' %
                            (len(requests_by_id), len(ids_needing_retry)))
        time.sleep(0.5)
        requests_by_id_to_retry = {request_id: requests_by_id[request_id]
                                   for request_id in ids_needing_retry}
        run_http_requests_with_retry(requests_by_id_to_retry, callback,
                                     retries_remaining - 1)


def should_retry_http_exception(exception):
    """Returns True if the given HTTP exception should be retried.

    The two main cases that are worth retrying are when we've hit BigQuery API
    request rate limits or when BigQuery has some other error and gives a
    message saying "Unexpected. Please try again.".
    """
    if not exception or not isinstance(exception, apiclient_errors.HttpError):
        return False
    content = json.loads(exception.content)
    code, message = content['error']['code'], content['error']['message']
    return ((code == 403 and message.startswith('Exceeded rate limits')) or
        (code == 500 and 'Please try again' in message))


def partition(source_list, partition_size):
    """Break the given list up into a list of lists, each of the given size.

    Every resulting list will have size partition_size except possibly the last
    one, and no list will be empty.
    """
    return [source_list[n:n + partition_size]
            for n in xrange(0, len(source_list), partition_size)]