Skip to content

Instantly share code, notes, and snippets.

@Tjorriemorrie
Last active November 16, 2015 07:44
Show Gist options
  • Save Tjorriemorrie/7ab8fbd482b8ce0768f6 to your computer and use it in GitHub Desktop.
Save Tjorriemorrie/7ab8fbd482b8ce0768f6 to your computer and use it in GitHub Desktop.
multiprocessing
import os
import time
import requests
import argparse
from config import *
from logger import Logger
from bigquery import BigQuery
from storage import Storage
from solr import Solr
from oauth2client.client import GoogleCredentials
from apiclient.discovery import build
# from multiprocessing import Pool, Manager
from pathos import multiprocessing
# from functools import partial
import xml.etree.ElementTree as ET
import humanize
class Loader():
def __init__(self):
self.log = Logger()('loader')
self.bqs = BigQuery(self.log)
self.gcs = Storage(self.log)
self.solr = Solr(self.log)
self.log.debug('Loader:init')
def run(self, dataset, table, server, collection, bucket, truncate, multi_processes):
self.log.info('Loader:run start')
# params
self.log.info('Loader:run table: {}'.format(table))
self.log.info('Loader:run dataset: {}'.format(dataset))
self.log.info('Loader:run server: {}'.format(server))
self.log.info('Loader:run collection: {}'.format(collection))
self.log.info('Loader:run truncate: {}'.format(truncate))
self.log.info('Loader:run multi processes: {}'.format(multi_processes))
data_type = self.bqs.max_value(dataset, table, 'type')
schema = self.bqs.schema(dataset, table)
multi_values = self.solr.multi_valued_fields(schema)
if not truncate:
delta = self.solr.delta(server, collection, data_type)
if delta:
raise NotImplementedError()
# query = 'SELECT'
# self.bqs.query(dataset, table, q)
else:
self.log.warn('Loader:run no delta found, turning on truncate')
truncate = True
self.bqs.export_storage(dataset, table, bucket)
try:
destination_files = self.gcs.list_bucket(bucket, '{}/{}'.format(dataset, table))
# if truncate:
# self.solr.truncate_type(server, collection, data_type)
self.index_files(server, collection, destination_files, multi_values, bucket, multi_processes)
except Exception:
raise
finally:
self.gcs.clear_bucket(bucket, destination_files)
self.log.info('Loader:run done')
def index_files(self, server, collection, destination_files, multi_values, bucket, multi_processes):
self.log.info('Loader:index_files {} files to server: {} collection: {}'.format(len(destination_files), server, collection))
solr_url = 'http://{}:8983/solr/{}/update?commit=false&softCommit=false&optimize=false&overwrite=true{}'.format(server, collection, multi_values)
self.log.debug('Loader:index_files Url: {}'.format(solr_url))
headers = {
'charset': 'utf-8',
'content-type': 'application/csv',
}
self.log.debug('Headers: {}'.format(headers))
# durations = Manager().list([0] * len(destination_files))
durations = []
# indexFile_partial = partial(self.index_file, bucket=bucket, solr_url=solr_url, headers=headers, durations=durations)
pool = multiprocessing.ProcessingPool(multi_processes)
destination_files = pool.map(self.index_file, destination_files, bucket, solr_url, headers, durations)
# pool.close()
# pool.join()
self.log.info('Loader:index_files {} files indexed'.format(len(destination_files)))
def index_file(self, destination_file, bucket, solr_url, headers, durations):
timeStart = time.time()
media = self.gcs.media(destination_file)
self.solr.index(solr_url, headers, media, destination_file)
# update ETA
# durations.append(time.time() - timeStart)
# # log.info('durations appended {} {}'.format(len(durations), durations))
# durations.pop(0)
# # log.info('durations shifted {} {}'.format(len(durations), durations))
# durations_finished = [v for v in durations if v > 0]
# avg = sum(durations_finished) / float(len(durations_finished))
# # log.info('durations avg {}'.format(avg))
# remaining = (len(durations) - len(durations_finished)) * avg / 60. / float(MULTI_PROCESSES)
# completed = len(durations_finished) / float(len(durations)) * 100.
# self.log.info('{}% completed ({}m remaining)'.format(int(completed), int(round(remaining))))
def main(dataset, table, server, collection, bucket, truncate, multi_processes):
loader = Loader()
loader.run(dataset, table, server, collection, bucket, truncate, multi_processes)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('dataset')
parser.add_argument('table')
parser.add_argument('-s', '--server', default=SOLR_SERVER)
parser.add_argument('-c', '--collection', default=SOLR_COLLECTION)
parser.add_argument('-b', '--bucket', default=BUCKET_EXPORT)
parser.add_argument('-t', '--truncate', action='store_true')
parser.add_argument('-p', '--processes', default=MULTI_PROCESSES)
args = parser.parse_args()
dataset = args.dataset
table = args.table
server = args.server
collection = args.collection
bucket = args.bucket
truncate = args.truncate
multi_processes = args.processes
main(dataset, table, server, collection, bucket, truncate, multi_processes)
@Tjorriemorrie
Copy link
Author

import time
import argparse
from config import *
from logger import Logger
from bigquery import BigQuery
from storage import Storage
from solr import Solr
from multiprocessing import Manager
from pathos.multiprocessing import Pool
import psutil
from functools import partial

class Loader():
    def __init__(self):
        self.log = Logger()('loader')
        self.bqs = BigQuery(self.log)
        self.gcs = Storage(self.log)
        self.solr = Solr(self.log)
        self.log.debug('Loader:init')

    def run(self, dataset, table, server, collection, bucket, truncate):
        self.log.info('Loader:run start')

        # params
        self.log.info('Loader:run table: {}'.format(table))
        self.log.info('Loader:run dataset: {}'.format(dataset))
        self.log.info('Loader:run server: {}'.format(server))
        self.log.info('Loader:run collection: {}'.format(collection))
        self.log.info('Loader:run truncate: {}'.format(truncate))

        data_type = self.bqs.max_value(dataset, table, 'type')
        schema = self.bqs.schema(dataset, table)
        multi_values = self.solr.multi_valued_fields(schema)

        if not truncate:
            delta = self.solr.delta(server, collection, data_type)
            if delta:
                query = 'SELECT * FROM {0}.{1} WHERE delta > "{2}"'.format(dataset, table, delta)
                self.bqs.query('tmp', table, query)
                dataset = 'tmp'
            else:
                self.log.warn('Loader:run no delta found, turning on truncate')
                truncate = True

        self.bqs.export_storage(dataset, table, bucket)

        try:
            destination_files = self.gcs.list_bucket(bucket, '{}/{}'.format(dataset, table))

            if truncate:
                self.solr.truncate_type(server, collection, data_type)

            self.index_files(server, collection, destination_files, multi_values, bucket)

        except Exception:
            raise
        finally:
            self.gcs.clear_bucket(bucket, destination_files)

        self.log.info('Loader:run done')

    def index_files(self, server, collection, destination_files, multi_values, bucket):
        self.log.info('Loader:index_files {} files to server: {} collection: {}'.format(len(destination_files), server, collection))

        solr_url = 'http://{}:8983/solr/{}/update?commit=false&softCommit=false&optimize=false&overwrite=true{}'.format(server, collection, multi_values)
        self.log.debug('Loader:index_files Url: {}'.format(solr_url))

        durations = Manager().list([0] * len(destination_files))
        index_file_partial = partial(call_it, instance=self, method='index_file', bucket=bucket, solr_url=solr_url, durations=durations)

        pool = Pool(psutil.cpu_count())
        destination_files = pool.map(index_file_partial, destination_files)
        pool.close()
        pool.join()

        self.log.info('Loader:index_files {} files indexed'.format(len(destination_files)))


    def index_file(self, destination_file, bucket, solr_url, durations):
        timeStart = time.time()

        # reinitialize because of threading
        self.gcs = Storage(self.log)
        self.solr = Solr(self.log)

        media = self.gcs.media(bucket, destination_file)

        self.solr.index(solr_url, media, destination_file)

        # update ETA
        durations.append(time.time() - timeStart)
        # log.info('durations appended {} {}'.format(len(durations), durations))
        durations.pop(0)
        # log.info('durations shifted {} {}'.format(len(durations), durations))
        durations_finished = [v for v in durations if v > 0]
        avg = sum(durations_finished) / float(len(durations_finished))
        # log.info('durations avg {}'.format(avg))
        remaining = (len(durations) - len(durations_finished)) * avg / 60. / float(psutil.cpu_count())
        completed = len(durations_finished) / float(len(durations)) * 100.
        self.log.info('{}% completed ({}m remaining)'.format(int(completed), int(round(remaining))))


def call_it(index_file, instance, method, **kwargs):
    "indirect caller for instance methods and multiprocessing"
    return getattr(instance, method)(index_file, **kwargs)


def main(dataset, table, server, collection, bucket, truncate):
    loader = Loader()
    loader.run(dataset, table, server, collection, bucket, truncate)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('dataset')
    parser.add_argument('table')
    parser.add_argument('-s', '--server', default=SOLR_SERVER)
    parser.add_argument('-c', '--collection', default=SOLR_COLLECTION)
    parser.add_argument('-b', '--bucket', default=BUCKET_EXPORT)
    parser.add_argument('-t', '--truncate', action='store_true')
    args = parser.parse_args()

    dataset = args.dataset
    table = args.table
    server = args.server
    collection = args.collection
    bucket = args.bucket
    truncate = args.truncate

    main(dataset, table, server, collection, bucket, truncate)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment