Last active
July 19, 2019 12:34
-
-
Save allynt/8992d8cbb4239c04d8835fb19cdf402e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fnmatch | |
import os | |
from functools import reduce | |
from itertools import islice | |
from math import floor | |
from io import BytesIO | |
import numpy as np | |
import pandas as pd | |
import geopandas as gpd | |
from django import db | |
from django.conf import settings | |
from django.db import transaction | |
from rest_framework.utils import json | |
from mapbox import Uploader | |
from celery import shared_task | |
from core.utils import ( | |
DataClient, | |
select_and_rename_columns, | |
combine_data_frames, | |
adapt_geojson_to_django, | |
bulk_update_or_create, | |
) | |
from core.utils import grouper | |
from thermcert.constants import PROCESSED_DATA_PATH | |
from thermcert.models impor, ThermcertData | |
from thermcert.utils import lsoa_code_to_int | |
logger = logging.getLogger(__name__) | |
def batch_import(model_generator, batch_size, import_fn, verbose=None): | |
""" | |
Performs bulk_create or bulk_update one "batch" at a time. | |
""" | |
i = 0 | |
while True: | |
model_batch = list(islice(model_generator, batch_size)) | |
if not model_batch: | |
break | |
if verbose: print(f"importing objects {i} to {i + len(model_batch)}...") | |
# db.reset_queries() | |
import_fn(model_batch) | |
i += batch_size | |
@shared_task | |
@transaction.atomic | |
def import_data(pattern="lsoa_*.csv", batch_size=0, active_only=False, verbose=False): | |
""" | |
Imports data from the S3 Bucket specified in settings. The bucket objects are filtered according | |
to 'pattern'. Combines the properties from all data files into a single data_frame. Then works | |
out which rows of that frame to UPDATE, and which to INSERT. It creates model generators for each | |
subset of rows and then calls bulk_create or bulk_update as appropriate in batches. | |
""" | |
n_created = 0 | |
n_updated = 0 | |
columns = ThermcertData.get_name_mapping("data_name", "field_name", active_only=active_only) | |
index_name = "lsoa_code" | |
data_frames = [] | |
pattern = f"^{PROCESSED_DATA_PATH}/lsoa_csv/{fnmatch.translate(pattern)}$" | |
if verbose: print(f"looking for data files at '{pattern}'...") | |
client = DataClient() | |
for data_object in client.get_all_matching_objects(pattern): | |
file_path = data_object.metadata["Key"] | |
file_name, file_extension = os.path.splitext(file_path) | |
if verbose: print(f"fetching '{file_path}' and converting it to a data_frame... ") | |
# read the current data... | |
if file_extension == ".geojson": | |
geo_data_frame = gpd.read_file(data_object.stream) | |
data_frame = pd.DataFrame(geo_data_frame.drop(columns="geometry")) | |
elif file_extension == ".csv": | |
data_frame = pd.read_csv(data_object.stream) | |
else: | |
msg = f"Unable to read data file of type: '{file_extension}'.'" | |
raise NotImplementedError(msg) | |
# restrict it to the desired columns... | |
data_frame_columns = { | |
k: v for k, v in columns.items() | |
if k in data_frame.columns.tolist() | |
} | |
# rename those columns and re-index the data... | |
data_frame = select_and_rename_columns( | |
data_frame, | |
data_frame_columns, | |
index_name=index_name, | |
) | |
# replace NaN w/ None... | |
data_frame = data_frame.where((pd.notnull(data_frame)), None) | |
# make sure there are no duplicate rows... | |
assert not any(data_frame.index.duplicated()), f"Error: {index_name} is duplicated in {file_path}." | |
# store it for later... | |
data_frames.append(data_frame) | |
if data_frames: | |
if verbose: print("combining all data into a single data_frame...") | |
# combine all of the stored data... | |
combined_data_frame = reduce( | |
lambda a, b: combine_data_frames(a, b), | |
data_frames, | |
) | |
# and put the index back... | |
combined_data_frame[index_name] = combined_data_frame.index | |
# ordinarily, this bit is done on the save method but save doesn't get called during bulk operations | |
# I _could_ overwrite bulk_create, and loop through (pre-saved) django models | |
# but it's just faster to work directly on a data_frame here | |
combined_data_frame["feature_id"] = combined_data_frame["lsoa_code"].apply(lsoa_code_to_int) | |
if verbose: print("determining which data to INSERT and which data to UPDATE...") | |
existing_lsoa_codes = ThermcertData.objects.values_list("lsoa_code", flat=True) | |
rows_to_update = combined_data_frame.filter(items=existing_lsoa_codes, axis=0) | |
columns_to_update = rows_to_update.drop(index_name, axis=1).columns.to_list() | |
columns_to_update_chunk_size = floor(len(columns_to_update) / 2) | |
columns_to_update_chunks = [list(filter(None, chunk)) for chunk in grouper(columns_to_update, columns_to_update_chunk_size)] | |
models_to_update = ( | |
ThermcertData(**row, pk=model.pk) | |
for model, (i, row) in zip(ThermcertData.objects.filter(lsoa_code__in=rows_to_update.lsoa_code), rows_to_update.iterrows()) | |
) | |
rows_to_create = combined_data_frame[~combined_data_frame.index.isin(rows_to_update.index)] | |
models_to_create = ( | |
ThermcertData(**row) | |
for i, row in rows_to_create.iterrows() | |
) | |
# ###################### | |
# # BEGIN TECHNIQUE #1 # | |
# ###################### | |
# n_chunks = 1000 | |
# i = 0 | |
# for _, combined_data_frame_chunk in combined_data_frame.groupby(np.arange(len(combined_data_frame))//n_chunks): | |
# if verbose: | |
# # the above loop on "combined_data_frame.groupby()" is a bit confusing; | |
# # it exists in order to provide feedback here on the progress of this long-running operation | |
# combined_data_frame_chunk_size = len(combined_data_frame_chunk) | |
# print(f"importing objects {i} to {i + combined_data_frame_chunk_size}...") | |
# i += combined_data_frame_chunk_size | |
# for index, row in combined_data_frame_chunk.iterrows(): | |
# thermcert_data, created = ThermcertData.objects.update_or_create( | |
# lsoa_code=index, | |
# defaults=row.to_dict() | |
# ) | |
# if created: | |
# n_created += 1 | |
# else: | |
# n_updated += 1 | |
# # CREATE: 405.295362873 | |
# # UPDATE: 455.6755572450056s: | |
#################### | |
# END TECHNIQUE #1 # | |
#################### | |
# ###################### | |
# # BEGIN TECHNIQUE #2 # | |
# ###################### | |
# bulk_update_or_create( | |
# ThermcertData, | |
# (row.to_dict() for i, row in combined_data_frame.iterrows()), | |
# comparator_fn=None, | |
# ) | |
# # CREATE: 239.54632384701108s: | |
# # UPDATE: fails | |
# #################### | |
# # END TECHNIQUE #2 # | |
# #################### | |
###################### | |
# BEGIN TECHNIQUE #3 # | |
###################### | |
# if verbose: print(f"going to perform INSERT on {len(rows_to_create)} objects...") | |
# if not batch_size: | |
# create_batch_size=len(rows_to_create) | |
# else: | |
# create_batch_size = batch_size | |
# batch_import( | |
# models_to_create, create_batch_size, | |
# lambda model_batch: ThermcertData.objects.bulk_create(model_batch, batch_size=create_batch_size), | |
# verbose=verbose | |
# ) | |
# n_created += len(rows_to_create) | |
# if verbose: print(f"going to perform UPDATE on {len(rows_to_update)} objects (in {len(columns_to_update_chunks)} chunks)...") | |
# if not batch_size: | |
# update_batch_size=len(rows_to_update) | |
# else: | |
# update_batch_size = batch_size | |
# # have to update columns in chunks or I risk maxing out memory | |
# for columns_to_update_chunk in columns_to_update_chunks: | |
# batch_import( | |
# models_to_update, update_batch_size, | |
# lambda model_batch: ThermcertData.objects.bulk_update(model_batch, columns_to_update_chunk, batch_size=update_batch_size), | |
# verbose=verbose | |
# ) | |
# n_updated += len(rows_to_update) | |
# CREATE (BATCH 0): #119.625746768 | |
# CREATE (BATCH 10000): 299.505621346 | |
# UPDATE (BATCH 0): unnacceptably slow | |
# UPDATE (BATCH 10000): unnacceptably slow | |
# #################### | |
# # END TECHNIQUE #3 # | |
# #################### | |
return (n_created, n_updated) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment