Skip to content

Instantly share code, notes, and snippets.

@snopoke
Last active August 2, 2018 10:45
Show Gist options
  • Save snopoke/6cf8d07b2509790592e19528cb6a69d3 to your computer and use it in GitHub Desktop.
Save snopoke/6cf8d07b2509790592e19528cb6a69d3 to your computer and use it in GitHub Desktop.
Size comparison for storing case block data in SQL alongside case transactions
import cStringIO
import json
import psycopg2
from django.db import connections
from corehq.apps.dump_reload.sql.serialization import JsonLinesSerializer
from corehq.form_processor.models import CommCareCaseSQL
class FieldJsonLinesSerializer(JsonLinesSerializer):
def get_dump_object(self, obj):
current = self._current
current['case_json'] = json.loads(current['case_json'])
return current
for case in CommCareCaseSQL.objects.using('default').all():
stream = cStringIO.StringIO()
value_s = FieldJsonLinesSerializer().serialize(
[case],
use_natural_foreign_keys=False,
use_natural_primary_keys=True,
stream=stream
)
value = value_s.encode('zlib')
with connections['default'].cursor() as cursor:
cursor.execute(
"update form_processor_commcarecasesql set snapshot = %s where case_id = %s",
[psycopg2.Binary(value), case.case_id]
)
from __future__ import absolute_import
from __future__ import unicode_literals
import json
import os
from datetime import date
from casexml.apps.case.const import CASE_ACTIONS
from casexml.apps.case.xform import extract_case_blocks
from casexml.apps.case.xml.parser import case_update_from_block
from corehq.apps.dump_reload.sql.serialization import JsonLinesSerializer
from corehq.apps.es import CaseES
from corehq.form_processor.models import CommCareCaseSQL, CaseTransaction
from corehq.sql_db.util import split_list_by_db_partition
from dimagi.utils.chunked import chunked
def write_case_blocks(case_blocks):
with open('case_dump_blocks.json', 'a') as block_steam:
for b in case_blocks:
json.dump(b, block_steam)
block_steam.write(b'\n')
def object_gen(case_ids):
for db_name, ids in split_list_by_db_partition(case_ids):
cases = CommCareCaseSQL.objects.using(db_name).filter(case_id__in=ids)
for case in cases:
yield case
for case_id_chunk in chunked(ids, 100):
for ct in transaction_gen(db_name, case_id_chunk):
yield ct
def transaction_gen(db_name, case_id_chunk):
transactions = list(CaseTransaction.objects.using(db_name).filter(case_id__in=case_id_chunk))
case_blocks = []
def raw_block(case_update):
rb = case_update.raw_block
for k in list(rb):
if k not in CASE_ACTIONS:
del rb[k]
return rb
for ct in transactions:
yield ct
form = ct.form
if not form:
continue
case_updates = filter(
lambda cu: cu.id in case_id_chunk,
[case_update_from_block(cb) for cb in extract_case_blocks(form)]
)
case_blocks.extend([
{'case_id': u.id, 'form_id': form.form_id, 'block': raw_block(u)}
for u in case_updates
])
write_case_blocks(case_blocks)
try:
os.unlink('case_dump_blocks.json')
except OSError:
pass
case_ids = (
CaseES().domain('icds-cas')
.opened_range(lt=date(2018, 1, 1))
.active_in_range(gt=date(2018, 6, 1))
.size(1000).get_ids()
)
with open('case_dump.json', 'w') as output_stream:
objects = object_gen(case_ids)
JsonLinesSerializer().serialize(
objects,
use_natural_foreign_keys=False,
use_natural_primary_keys=True,
stream=output_stream
)
from corehq.apps.dump_reload.sql.load import SqlDataLoader
import gzip
import json
from django.db import connections
import cStringIO
import psycopg2
l = SqlDataLoader()
with gzip.open('case_dump.json.gz') as f:
l.load_objects(f)
def get_block_val(block):
block_string = json.dumps(block['block'])
# gzip
# stream = cStringIO.StringIO()
# with gzip.GzipFile(fileobj=stream, mode="w") as f:
# f.write(block_string)
# value = stream.getvalue()
#
# zlib
value = block_string.encode('zlib')
return psycopg2.Binary(value)
with gzip.open('case_dump_blocks.json.gz') as f:
for line in f:
block = json.loads(line)
block_val = get_block_val(block)
with connections['default'].cursor() as cursor:
cursor.execute(
"update form_processor_casetransaction set case_block = %s where case_id = %s and form_id = %s",
[block_val, block['case_id'], block['form_id']]
)
-- add case block column
ALTER TABLE public.form_processor_casetransaction ADD case_block bytea NULL;
-- add snapshot column
ALTER TABLE public.form_processor_commcarecasesql ADD snapshot bytea NULL;
-- delete all data prior to import
truncate form_processor_commcarecasesql cascade;
-- vacuum and analyze before getting size
vacuum full;
ANALYZE form_processor_commcarecasesql;
ANALYZE form_processor_casetransaction;
SELECT table_name, row_estimate, pg_size_pretty(toast_bytes) AS toast
, pg_size_pretty(table_bytes) AS TABLE
FROM (
SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM (
SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME
, c.reltuples AS row_estimate
, pg_total_relation_size(c.oid) AS total_bytes
, pg_indexes_size(c.oid) AS index_bytes
, pg_total_relation_size(reltoastrelid) AS toast_bytes
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r' AND nspname = 'public'
) a
) a ORDER BY total_bytes DESC;
-- Clear blocks
update form_processor_casetransaction set case_block = NULL;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment