Skip to content

Instantly share code, notes, and snippets.

@caseycrites
Created April 8, 2011 21:00
Show Gist options
  • Select an option

  • Save caseycrites/910731 to your computer and use it in GitHub Desktop.

Select an option

Save caseycrites/910731 to your computer and use it in GitHub Desktop.
import argparse
import csv
from datetime import datetime, timedelta
import dateutil.parser
import eventlet
eventlet.monkey_patch()
import getopt
import itertools
import logging
import sys
import time
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from simplegeo_internal import Client
import config
log = logging.getLogger('simplegeo.nash')
log.setLevel(logging.DEBUG)
log.addHandler(logging.FileHandler(config.LOG_FILE))
class Usage(Exception):
def __init__(self, msg):
self.msg = msg
def main(argv=None):
log.info('Starting nash...')
if argv is None:
argv = sys.argv
try:
#try:
# opts, args = getopt.getopt(argv[1:], "h", ["help"])
#except getopt.error, e:
# raise Usage(e)
conn = S3Connection(config.AWS_ACCESS_KEY, config.AWS_SECRET_KEY)
buckets = config.S3_BUCKETS
for bucket in buckets:
s3_bucket = conn.get_bucket(bucket)
timestamp = get_timestamp_for_date(datetime.now() - timedelta(days=1))
files_to_process = []
for key in s3_bucket.list():
file_name = key.name[key.name.rindex('/')+1:]
#if file_name and int(file_name) >= timestamp:
if file_name:
key.get_contents_to_filename(config.LOCAL_FILE_PATH + '/' +
key.name)
files_to_process.append(config.LOCAL_FILE_PATH + '/' + key.name)
for file_path in files_to_process:
pool = eventlet.GreenPool(1)
q = eventlet.Queue()
with open(file_path, 'rb') as f:
# TODO Make this dynamic
if file_path.find('storage') != -1:
loader = load_storage_rows
parser = parse_storage_file
elif file_path.find('services') != -1:
loader = load_services_rows
parser = parse_services_file
reader = csv.reader(f)
log.info('Processing ' + file_path)
for message in pool.imap(loader,
itertools.imap(parser, reader)):
q.put(message)
q.put('FINITO')
queue_consumer(q)
except Usage, e:
log.exception(e)
print >>sys.stderr, e.msg
#print >>sys.stderr, "for help use --help"
return 2
except Exception, e:
log.exception(e)
return 2
log.info('Ending nash...')
def get_timestamp_for_date(date):
return time.mktime(date.utctimetuple()) + (date.microsecond / 1000000.0)
def parse_services_file(row):
timestamp = get_timestamp_for_date(dateutil.parser.parse(row[4]))
return [row[0], row[3], row[5], row[6], timestamp]
def load_services_rows(row):
try:
return client.set_billing_service_count(*row)
except Exception, e:
log.exception(e)
return str(e)
def parse_storage_file(row):
timestamp = get_timestamp_for_date(dateutil.parser.parse(row[3]))
return [row[0], row[1], row[2], timestamp]
def load_storage_rows(row):
try:
return client.set_billing_record_count(*row)
except Exception, e:
return str(e)
def queue_consumer(q):
counter = 0
while True:
counter += 1
message = q.get()
if message == 'FINITO':
break
log.info(str(counter) + ' ' + str(message))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Move billing log files from \
S3 to Redis or to the website.')
argv = parser.parse_args()
# Is this bad? Is someone going to kill me for doing this?
global client
client = Client(config.OAUTH_KEY, config.OAUTH_SECRET,
host=config.API_HOST, port=config.API_PORT)
sys.exit(main(argv=argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment