Created
March 10, 2016 22:27
-
-
Save evz/4c3aad41569049140ae3 to your computer and use it in GitHub Desktop.
Faster loader for open ee meter datastore
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import sqlalchemy as sa | |
import os | |
import pytz | |
from datetime import datetime | |
import itertools | |
tz = pytz.timezone('America/Chicago') | |
# DB_CONN = os.environ['DATABASE_URL'] | |
DB_CONN = 'postgres://eric:@localhost:5432/oeem_test' | |
engine = sa.create_engine(DB_CONN) | |
project_table = sa.Table('datastore_project', | |
sa.MetaData(), | |
autoload=True, | |
autoload_with=engine) | |
attribute_keys = [ | |
{ | |
'id': 1, | |
'name': 'project_cost', | |
'data_type': 'float_value', | |
'display_name': 'Project Cost', | |
}, | |
{ | |
'id': 2, | |
'name': 'predicted_electricity_savings', | |
'data_type': 'float_value', | |
'display_name': 'Estimated Electricity Savings' | |
}, | |
] | |
def setupTables(): | |
conn = engine.connect() | |
trans = conn.begin() | |
try: | |
conn.execute(''' | |
INSERT INTO auth_user ( | |
id, | |
password, | |
is_superuser, | |
username, | |
first_name, | |
last_name, | |
email, | |
is_staff, | |
is_active, | |
date_joined | |
) VALUES ( | |
1, | |
'test', | |
TRUE, | |
'test', | |
'tester', | |
'mctesterson', | |
'[email protected]', | |
TRUE, | |
TRUE, | |
NOW() | |
) | |
''') | |
trans.commit() | |
except sa.exc.IntegrityError: | |
trans.rollback() | |
trans = conn.begin() | |
try: | |
conn.execute(''' | |
INSERT INTO datastore_projectowner ( | |
id, | |
user_id, | |
added, | |
updated | |
) VALUES ( | |
1, | |
1, | |
NOW(), | |
NOW() | |
) | |
''') | |
trans.commit() | |
except sa.exc.IntegrityError: | |
trans.rollback() | |
trans = conn.begin() | |
try: | |
conn.execute('TRUNCATE datastore_projectattributekey CASCADE') | |
conn.execute(sa.text(''' | |
INSERT INTO datastore_projectattributekey ( | |
id, | |
name, | |
data_type, | |
display_name | |
) VALUES ( | |
:id, | |
:name, | |
:data_type, | |
:display_name | |
) | |
'''), *attribute_keys) | |
trans.commit() | |
except sa.exc.IntegrityError as e: | |
trans.rollback() | |
def loadProjects(): | |
with open("project-processed.csv", 'r') as project_f: | |
reader = csv.reader(project_f) | |
header = next(reader) | |
project_header = header[:7] | |
project_header = project_header + ['project_owner_id', 'added', 'updated'] | |
projects = [] | |
with engine.begin() as conn: | |
conn.execute('TRUNCATE datastore_project CASCADE') | |
for row in reader: | |
this_row = row[:7] | |
now = tz.localize(datetime.now()) | |
this_row = this_row + [1, now, now] | |
projects.append(dict(zip(project_header, this_row))) | |
if len(projects) % 50000 == 0: | |
conn.execute(project_table.insert(), *projects) | |
projects = [] | |
if projects: | |
conn.execute(project_table.insert(), *projects) | |
project_f.seek(0) | |
header = next(reader) | |
# float_value, key_id, project_id | |
attr_insert = ''' | |
INSERT INTO datastore_projectattribute ( | |
float_value, | |
key_id, | |
project_id | |
) VALUES ( | |
:float_value, | |
:key_id, | |
(SELECT id FROM datastore_project WHERE project_id = :project_id) | |
) | |
''' | |
attributes = [] | |
with engine.begin() as conn: | |
conn.execute('TRUNCATE datastore_projectattribute CASCADE') | |
for row in reader: | |
savings, cost = row[7:] | |
attributes.append({ | |
'float_value': savings, | |
'key_id': 2, | |
'project_id': row[0] | |
}) | |
attributes.append({ | |
'float_value': cost, | |
'key_id': 1, | |
'project_id': row[0] | |
}) | |
if len(projects) % 50000 == 0: | |
conn.execute(sa.text(attr_insert), *attributes) | |
attributes = [] | |
if attributes: | |
conn.execute(sa.text(attr_insert), *attributes) | |
def loadConsumption(): | |
grouper = lambda x: x[0] | |
metadata_insert = ''' | |
INSERT INTO datastore_consumptionmetadata ( | |
project_id, | |
energy_unit, | |
fuel_type, | |
added, | |
updated | |
) | |
SELECT | |
id, | |
'KWH' AS energy_unit, | |
'E' AS fuel_type, | |
NOW() AS added, | |
NOW() AS updated | |
FROM datastore_project | |
WHERE project_id = :project_id | |
RETURNING id | |
''' | |
record_insert = ''' | |
INSERT INTO datastore_consumptionrecord ( | |
start, | |
value, | |
estimated, | |
metadata_id | |
) VALUES ( | |
:start, | |
:value, | |
:estimated, | |
:metadata_id | |
) | |
''' | |
with open('consumption-processed.csv', 'r') as f: | |
reader = csv.reader(f) | |
next(reader) | |
record_header = ['start', 'value', 'estimated'] | |
total = 0 | |
with engine.begin() as conn: | |
conn.execute('TRUNCATE datastore_consumptionmetadata CASCADE') | |
conn.execute('TRUNCATE datastore_consumptionrecord CASCADE') | |
with engine.begin() as conn: | |
record_inserts = [] | |
for project_id, project_group in itertools.groupby(reader, key=grouper): | |
metadata_id = list(conn.execute(sa.text(metadata_insert), | |
project_id=project_id)) | |
if metadata_id: | |
for record in project_group: | |
row = dict(zip(record_header, record[4:])) | |
row['metadata_id'] = metadata_id[0].id | |
record_inserts.append(row) | |
if len(record_inserts) % 50000 == 0: | |
total += 50000 | |
print('inserted', total) | |
conn.execute(sa.text(record_insert), *record_inserts) | |
record_inserts = [] | |
if record_inserts: | |
conn.execute(sa.text(record_insert), *record_inserts) | |
print('inserted', (total + len(record_inserts))) | |
if __name__ == "__main__": | |
setupTables() | |
loadProjects() | |
loadConsumption() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment