Skip to content

Instantly share code, notes, and snippets.

@evansd
Created August 6, 2018 14:17
Show Gist options
  • Save evansd/53d22d6a47db181c4908bb97498c0687 to your computer and use it in GitHub Desktop.
Save evansd/53d22d6a47db181c4908bb97498c0687 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import csv
import itertools
import sys
import lmdb
import numpy
import pyarrow
import scipy.sparse
DATE_COL = 0
PRESENTATION_COL = 1
PRACTICE_COL = 2
ITEMS_COL = 4
QUANTITY_COL = 5
ACTUAL_COST_COL = 6
NET_COST_COL = 7
def float_100(v):
return float(v) * 100
COL_TYPES = {
'items': (numpy.uint16, int, ITEMS_COL),
'quantity': (numpy.float_, float, QUANTITY_COL),
'cost': (numpy.float_, float_100, ACTUAL_COST_COL),
'net_cost': (numpy.float_, float_100, NET_COST_COL),
}
def load_practices():
practices = {}
with open('practice_codes.csv') as f:
for n, line in enumerate(f):
ccg_id, code = line.strip().split(',')
practices[code] = n
return practices
PRACTICES = load_practices()
DATES = {
'2017-{:02d}-01'.format(n+1): n
for n in range(12)
}
def rows_from_input(input, column):
py_type = COL_TYPES[column][1]
col_offset = COL_TYPES[column][2]
reader = csv.reader(input)
for row in reader:
yield (
row[PRESENTATION_COL].encode('utf-8'),
PRACTICES[row[PRACTICE_COL]],
DATES[row[DATE_COL]],
py_type(row[col_offset]),
)
def optimise_matrix(matrix):
matrix = matrix.tocsc()
matrix.sort_indices()
return matrix
"""
def get_matrix(env, key, type_name, dtype, rows):
db = env.open_db(type_name)
with env.begin(db=db) as txn:
serialized_matrix = txn.get(key)
if serialized_matrix:
return deserialize(serialized_matrix, dtype, rows).tolil()
else:
return create_matrix(dtype, rows).tolil()
"""
def write_matrix(txn, key, matrix):
data = serialize(matrix)
txn.put(key, data, append=True)
def create_matrix(dtype):
return scipy.sparse.lil_matrix((len(PRACTICES), len(DATES)*5), dtype=dtype)
def serialize(sparse_arr):
contents = (sparse_arr.data, sparse_arr.indices, sparse_arr.indptr)
return pyarrow.serialize(contents).to_buffer()
"""
def deserialize(buf, dtype, rows):
contents = pyarrow.deserialize(buf)
cols = len(contents[2]) - 1
return scipy.sparse.csc_matrix(contents, dtype=dtype, shape=(rows, cols))
"""
def write_to_db(txn, column, input):
rows = rows_from_input(input, column)
numpy_type = COL_TYPES[column][0]
n = 0
for bnf_code, group in itertools.groupby(rows, key=lambda t: t[0]):
matrix = create_matrix(numpy_type)
for _, practice_index, date_index, value in group:
for offset in (0, 12, 24, 36, 48):
matrix[(practice_index, date_index+offset)] = value
write_matrix(txn, bnf_code, optimise_matrix(matrix))
n += 1
print('Done {}, {}'.format(bnf_code, n))
if __name__ == '__main__':
column = sys.argv[1]
assert column in COL_TYPES
env = lmdb.open(
'lmdb-test-real-data.mdb',
map_size=100000000000,
subdir=False,
map_async=True,
lock=False,
max_dbs=4)
db = env.open_db(key=column.encode('ascii'))
with env.begin(db=db, write=True, ) as txn:
write_to_db(txn, column, sys.stdin)
env.sync(True)
env.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment