Created
August 6, 2018 14:17
-
-
Save evansd/53d22d6a47db181c4908bb97498c0687 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import csv | |
import itertools | |
import sys | |
import lmdb | |
import numpy | |
import pyarrow | |
import scipy.sparse | |
DATE_COL = 0 | |
PRESENTATION_COL = 1 | |
PRACTICE_COL = 2 | |
ITEMS_COL = 4 | |
QUANTITY_COL = 5 | |
ACTUAL_COST_COL = 6 | |
NET_COST_COL = 7 | |
def float_100(v): | |
return float(v) * 100 | |
COL_TYPES = { | |
'items': (numpy.uint16, int, ITEMS_COL), | |
'quantity': (numpy.float_, float, QUANTITY_COL), | |
'cost': (numpy.float_, float_100, ACTUAL_COST_COL), | |
'net_cost': (numpy.float_, float_100, NET_COST_COL), | |
} | |
def load_practices(): | |
practices = {} | |
with open('practice_codes.csv') as f: | |
for n, line in enumerate(f): | |
ccg_id, code = line.strip().split(',') | |
practices[code] = n | |
return practices | |
PRACTICES = load_practices() | |
DATES = { | |
'2017-{:02d}-01'.format(n+1): n | |
for n in range(12) | |
} | |
def rows_from_input(input, column): | |
py_type = COL_TYPES[column][1] | |
col_offset = COL_TYPES[column][2] | |
reader = csv.reader(input) | |
for row in reader: | |
yield ( | |
row[PRESENTATION_COL].encode('utf-8'), | |
PRACTICES[row[PRACTICE_COL]], | |
DATES[row[DATE_COL]], | |
py_type(row[col_offset]), | |
) | |
def optimise_matrix(matrix): | |
matrix = matrix.tocsc() | |
matrix.sort_indices() | |
return matrix | |
""" | |
def get_matrix(env, key, type_name, dtype, rows): | |
db = env.open_db(type_name) | |
with env.begin(db=db) as txn: | |
serialized_matrix = txn.get(key) | |
if serialized_matrix: | |
return deserialize(serialized_matrix, dtype, rows).tolil() | |
else: | |
return create_matrix(dtype, rows).tolil() | |
""" | |
def write_matrix(txn, key, matrix): | |
data = serialize(matrix) | |
txn.put(key, data, append=True) | |
def create_matrix(dtype): | |
return scipy.sparse.lil_matrix((len(PRACTICES), len(DATES)*5), dtype=dtype) | |
def serialize(sparse_arr): | |
contents = (sparse_arr.data, sparse_arr.indices, sparse_arr.indptr) | |
return pyarrow.serialize(contents).to_buffer() | |
""" | |
def deserialize(buf, dtype, rows): | |
contents = pyarrow.deserialize(buf) | |
cols = len(contents[2]) - 1 | |
return scipy.sparse.csc_matrix(contents, dtype=dtype, shape=(rows, cols)) | |
""" | |
def write_to_db(txn, column, input): | |
rows = rows_from_input(input, column) | |
numpy_type = COL_TYPES[column][0] | |
n = 0 | |
for bnf_code, group in itertools.groupby(rows, key=lambda t: t[0]): | |
matrix = create_matrix(numpy_type) | |
for _, practice_index, date_index, value in group: | |
for offset in (0, 12, 24, 36, 48): | |
matrix[(practice_index, date_index+offset)] = value | |
write_matrix(txn, bnf_code, optimise_matrix(matrix)) | |
n += 1 | |
print('Done {}, {}'.format(bnf_code, n)) | |
if __name__ == '__main__': | |
column = sys.argv[1] | |
assert column in COL_TYPES | |
env = lmdb.open( | |
'lmdb-test-real-data.mdb', | |
map_size=100000000000, | |
subdir=False, | |
map_async=True, | |
lock=False, | |
max_dbs=4) | |
db = env.open_db(key=column.encode('ascii')) | |
with env.begin(db=db, write=True, ) as txn: | |
write_to_db(txn, column, sys.stdin) | |
env.sync(True) | |
env.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment