Last active
November 16, 2015 07:44
-
-
Save Tjorriemorrie/7ab8fbd482b8ce0768f6 to your computer and use it in GitHub Desktop.
multiprocessing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import requests | |
import argparse | |
from config import * | |
from logger import Logger | |
from bigquery import BigQuery | |
from storage import Storage | |
from solr import Solr | |
from oauth2client.client import GoogleCredentials | |
from apiclient.discovery import build | |
# from multiprocessing import Pool, Manager | |
from pathos import multiprocessing | |
# from functools import partial | |
import xml.etree.ElementTree as ET | |
import humanize | |
class Loader(): | |
def __init__(self): | |
self.log = Logger()('loader') | |
self.bqs = BigQuery(self.log) | |
self.gcs = Storage(self.log) | |
self.solr = Solr(self.log) | |
self.log.debug('Loader:init') | |
def run(self, dataset, table, server, collection, bucket, truncate, multi_processes): | |
self.log.info('Loader:run start') | |
# params | |
self.log.info('Loader:run table: {}'.format(table)) | |
self.log.info('Loader:run dataset: {}'.format(dataset)) | |
self.log.info('Loader:run server: {}'.format(server)) | |
self.log.info('Loader:run collection: {}'.format(collection)) | |
self.log.info('Loader:run truncate: {}'.format(truncate)) | |
self.log.info('Loader:run multi processes: {}'.format(multi_processes)) | |
data_type = self.bqs.max_value(dataset, table, 'type') | |
schema = self.bqs.schema(dataset, table) | |
multi_values = self.solr.multi_valued_fields(schema) | |
if not truncate: | |
delta = self.solr.delta(server, collection, data_type) | |
if delta: | |
raise NotImplementedError() | |
# query = 'SELECT' | |
# self.bqs.query(dataset, table, q) | |
else: | |
self.log.warn('Loader:run no delta found, turning on truncate') | |
truncate = True | |
self.bqs.export_storage(dataset, table, bucket) | |
try: | |
destination_files = self.gcs.list_bucket(bucket, '{}/{}'.format(dataset, table)) | |
# if truncate: | |
# self.solr.truncate_type(server, collection, data_type) | |
self.index_files(server, collection, destination_files, multi_values, bucket, multi_processes) | |
except Exception: | |
raise | |
finally: | |
self.gcs.clear_bucket(bucket, destination_files) | |
self.log.info('Loader:run done') | |
def index_files(self, server, collection, destination_files, multi_values, bucket, multi_processes): | |
self.log.info('Loader:index_files {} files to server: {} collection: {}'.format(len(destination_files), server, collection)) | |
solr_url = 'http://{}:8983/solr/{}/update?commit=false&softCommit=false&optimize=false&overwrite=true{}'.format(server, collection, multi_values) | |
self.log.debug('Loader:index_files Url: {}'.format(solr_url)) | |
headers = { | |
'charset': 'utf-8', | |
'content-type': 'application/csv', | |
} | |
self.log.debug('Headers: {}'.format(headers)) | |
# durations = Manager().list([0] * len(destination_files)) | |
durations = [] | |
# indexFile_partial = partial(self.index_file, bucket=bucket, solr_url=solr_url, headers=headers, durations=durations) | |
pool = multiprocessing.ProcessingPool(multi_processes) | |
destination_files = pool.map(self.index_file, destination_files, bucket, solr_url, headers, durations) | |
# pool.close() | |
# pool.join() | |
self.log.info('Loader:index_files {} files indexed'.format(len(destination_files))) | |
def index_file(self, destination_file, bucket, solr_url, headers, durations): | |
timeStart = time.time() | |
media = self.gcs.media(destination_file) | |
self.solr.index(solr_url, headers, media, destination_file) | |
# update ETA | |
# durations.append(time.time() - timeStart) | |
# # log.info('durations appended {} {}'.format(len(durations), durations)) | |
# durations.pop(0) | |
# # log.info('durations shifted {} {}'.format(len(durations), durations)) | |
# durations_finished = [v for v in durations if v > 0] | |
# avg = sum(durations_finished) / float(len(durations_finished)) | |
# # log.info('durations avg {}'.format(avg)) | |
# remaining = (len(durations) - len(durations_finished)) * avg / 60. / float(MULTI_PROCESSES) | |
# completed = len(durations_finished) / float(len(durations)) * 100. | |
# self.log.info('{}% completed ({}m remaining)'.format(int(completed), int(round(remaining)))) | |
def main(dataset, table, server, collection, bucket, truncate, multi_processes): | |
loader = Loader() | |
loader.run(dataset, table, server, collection, bucket, truncate, multi_processes) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('dataset') | |
parser.add_argument('table') | |
parser.add_argument('-s', '--server', default=SOLR_SERVER) | |
parser.add_argument('-c', '--collection', default=SOLR_COLLECTION) | |
parser.add_argument('-b', '--bucket', default=BUCKET_EXPORT) | |
parser.add_argument('-t', '--truncate', action='store_true') | |
parser.add_argument('-p', '--processes', default=MULTI_PROCESSES) | |
args = parser.parse_args() | |
dataset = args.dataset | |
table = args.table | |
server = args.server | |
collection = args.collection | |
bucket = args.bucket | |
truncate = args.truncate | |
multi_processes = args.processes | |
main(dataset, table, server, collection, bucket, truncate, multi_processes) |
Author
Tjorriemorrie
commented
Nov 16, 2015
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment