Created
April 8, 2011 21:00
-
-
Save caseycrites/910731 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import csv | |
| from datetime import datetime, timedelta | |
| import dateutil.parser | |
| import eventlet | |
| eventlet.monkey_patch() | |
| import getopt | |
| import itertools | |
| import logging | |
| import sys | |
| import time | |
| from boto.s3.connection import S3Connection | |
| from boto.s3.key import Key | |
| from simplegeo_internal import Client | |
| import config | |
| log = logging.getLogger('simplegeo.nash') | |
| log.setLevel(logging.DEBUG) | |
| log.addHandler(logging.FileHandler(config.LOG_FILE)) | |
| class Usage(Exception): | |
| def __init__(self, msg): | |
| self.msg = msg | |
| def main(argv=None): | |
| log.info('Starting nash...') | |
| if argv is None: | |
| argv = sys.argv | |
| try: | |
| #try: | |
| # opts, args = getopt.getopt(argv[1:], "h", ["help"]) | |
| #except getopt.error, e: | |
| # raise Usage(e) | |
| conn = S3Connection(config.AWS_ACCESS_KEY, config.AWS_SECRET_KEY) | |
| buckets = config.S3_BUCKETS | |
| for bucket in buckets: | |
| s3_bucket = conn.get_bucket(bucket) | |
| timestamp = get_timestamp_for_date(datetime.now() - timedelta(days=1)) | |
| files_to_process = [] | |
| for key in s3_bucket.list(): | |
| file_name = key.name[key.name.rindex('/')+1:] | |
| #if file_name and int(file_name) >= timestamp: | |
| if file_name: | |
| key.get_contents_to_filename(config.LOCAL_FILE_PATH + '/' + | |
| key.name) | |
| files_to_process.append(config.LOCAL_FILE_PATH + '/' + key.name) | |
| for file_path in files_to_process: | |
| pool = eventlet.GreenPool(1) | |
| q = eventlet.Queue() | |
| with open(file_path, 'rb') as f: | |
| # TODO Make this dynamic | |
| if file_path.find('storage') != -1: | |
| loader = load_storage_rows | |
| parser = parse_storage_file | |
| elif file_path.find('services') != -1: | |
| loader = load_services_rows | |
| parser = parse_services_file | |
| reader = csv.reader(f) | |
| log.info('Processing ' + file_path) | |
| for message in pool.imap(loader, | |
| itertools.imap(parser, reader)): | |
| q.put(message) | |
| q.put('FINITO') | |
| queue_consumer(q) | |
| except Usage, e: | |
| log.exception(e) | |
| print >>sys.stderr, e.msg | |
| #print >>sys.stderr, "for help use --help" | |
| return 2 | |
| except Exception, e: | |
| log.exception(e) | |
| return 2 | |
| log.info('Ending nash...') | |
| def get_timestamp_for_date(date): | |
| return time.mktime(date.utctimetuple()) + (date.microsecond / 1000000.0) | |
| def parse_services_file(row): | |
| timestamp = get_timestamp_for_date(dateutil.parser.parse(row[4])) | |
| return [row[0], row[3], row[5], row[6], timestamp] | |
| def load_services_rows(row): | |
| try: | |
| return client.set_billing_service_count(*row) | |
| except Exception, e: | |
| log.exception(e) | |
| return str(e) | |
| def parse_storage_file(row): | |
| timestamp = get_timestamp_for_date(dateutil.parser.parse(row[3])) | |
| return [row[0], row[1], row[2], timestamp] | |
| def load_storage_rows(row): | |
| try: | |
| return client.set_billing_record_count(*row) | |
| except Exception, e: | |
| return str(e) | |
| def queue_consumer(q): | |
| counter = 0 | |
| while True: | |
| counter += 1 | |
| message = q.get() | |
| if message == 'FINITO': | |
| break | |
| log.info(str(counter) + ' ' + str(message)) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description='Move billing log files from \ | |
| S3 to Redis or to the website.') | |
| argv = parser.parse_args() | |
| # Is this bad? Is someone going to kill me for doing this? | |
| global client | |
| client = Client(config.OAUTH_KEY, config.OAUTH_SECRET, | |
| host=config.API_HOST, port=config.API_PORT) | |
| sys.exit(main(argv=argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment