-
-
Save tacaswell/d8acde54e0b1fdf30375f03b25036bbb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tqdm import tqdm | |
import ipyparallel as ipp | |
def main(): | |
target = 'hxn_migration' | |
rc = ipp.Client() | |
dview = rc[:] | |
with dview.sync_imports(): | |
from metadatastore.mds import MDS, MDSRO | |
from collections import deque | |
old_config = { | |
'database': "hxn_dump", | |
'host': 'localhost', | |
'port': 27017, | |
'timezone': 'US/Eastern'} | |
new_config = { | |
'database': target, | |
'host': 'localhost', | |
'port': 27017, | |
'timezone': 'US/Eastern'} | |
old_t = MDSRO(version=0, config=old_config) | |
new_t = MDS(version=1, config=new_config) | |
def condition_config(): | |
import time | |
global new, old | |
for md in [new, old]: | |
md._runstart_col.find_one() | |
md._runstop_col.find_one() | |
md._event_col.find_one() | |
md._descriptor_col.find_one() | |
time.sleep(1) | |
def invasive_checks(): | |
global old, new | |
return (old._MDSRO__conn is None, | |
new._MDSRO__conn is None) | |
dview.push({'old': old_t, 'new': new_t}) | |
dview.apply(condition_config) | |
print(list(dview.apply(invasive_checks))) | |
new_t._connection.drop_database(target) | |
# Drop all indexes on event collection to speed up insert. | |
# They will be rebuilt the next time an MDS(RO) object connects. | |
new_t._event_col.drop_indexes() | |
new = new_t | |
old = old_t | |
total = old._runstart_col.find().count() | |
for start in tqdm(old.find_run_starts(), desc='start docs', total=total): | |
new.insert('start', start) | |
total = old._runstop_col.find().count() | |
for stop in tqdm(old.find_run_stops(), desc='stop docs', total=total): | |
try: | |
new.insert('stop', stop) | |
except RuntimeError: | |
print("error inserting run stop with uid {!r}".format(stop['uid'])) | |
descs = deque() | |
counts = deque() | |
total = old._descriptor_col.find().count() | |
for desc in tqdm(old.find_descriptors(), unit='descriptors', total=total): | |
d_raw = next(old._descriptor_col.find({'uid': desc['uid']})) | |
num_events = old._event_col.find( | |
{'descriptor_id': d_raw['_id']}).count() | |
new.insert('descriptor', desc) | |
out = dict(desc) | |
out['run_start'] = out['run_start']['uid'] | |
descs.append(dict(desc)) | |
counts.append(num_events) | |
new.clear_process_cache() | |
old.clear_process_cache() | |
def migrate_event_stream(desc_in, num_events): | |
import pymongo.errors | |
import time | |
global new, old | |
if num_events: | |
flag = True | |
# skip empty event stream of bulk insert raises | |
while flag: | |
flag = False | |
try: | |
events = old.get_events_generator(descriptor=desc_in, | |
convert_arrays=False) | |
events = iter(events) | |
l_cache = deque() | |
while True: | |
try: | |
for j in range(5000): | |
l_cache.append(next(events)) | |
except StopIteration: | |
break | |
finally: | |
if l_cache: | |
new.bulk_insert_events(descriptor=desc_in, | |
events=l_cache) | |
l_cache.clear() | |
except KeyError: | |
pass | |
except pymongo.errors.AutoReconnect: | |
flag = True | |
time.sleep(10) | |
new.clear_process_cache() | |
old.clear_process_cache() | |
return num_events | |
v = rc.load_balanced_view() | |
amr = v.map(migrate_event_stream, descs, list(counts), ordered=False) | |
total = sum(counts) | |
with tqdm(total=total, unit='events') as pbar: | |
for res in amr: | |
pbar.update(res) | |
if __name__ == '__main__': | |
ar = main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from metadatastore.mds import MDS, MDSRO | |
import metadatastore.conf | |
from collections import deque | |
from tqdm import tqdm | |
def compare(o, n): | |
try: | |
assert o['uid'] == n['uid'] | |
assert o == n | |
except AssertionError: | |
print(o) | |
print(n) | |
raise | |
def main(): | |
old_config = dict(metadatastore.conf.connection_config) | |
new_config = old_config.copy() | |
new_config['database'] = 'metadatastore_production_v1' | |
old = MDSRO(version=0, config=old_config) | |
new = MDS(version=1, config=new_config) | |
total = old._runstart_col.find().count() | |
old_starts = tqdm(old.find_run_starts(), unit='start docs', total=total, | |
leave=True) | |
new_starts = new.find_run_starts() | |
for o, n in zip(old_starts, new_starts): | |
compare(o, n) | |
total = old._runstop_col.find().count() | |
old_stops = tqdm(old.find_run_stops(), unit='stop docs', total=total) | |
new_stops = new.find_run_stops() | |
for o, n in zip(old_stops, new_stops): | |
compare(o, n) | |
descs = deque() | |
counts = deque() | |
total = old._descriptor_col.find().count() | |
old_descs = tqdm(old.find_descriptors(), unit='descriptors', total=total) | |
new_descs = new.find_descriptors() | |
for o, n in zip(old_descs, new_descs): | |
d_raw = next(old._descriptor_col.find({'uid': o['uid']})) | |
num_events = old._event_col.find({'descriptor_id': d_raw['_id']}).count() | |
assert o == n | |
descs.append(o) | |
counts.append(num_events) | |
total = sum(counts) | |
with tqdm(total=total, unit='events') as pbar: | |
for desc, num_events in zip(descs, counts): | |
old_events = old.get_events_generator(descriptor=desc, | |
convert_arrays=False) | |
new_events = new.get_events_generator(descriptor=desc, | |
convert_arrays=False) | |
for ev in zip(old_events, new_events): | |
assert o == n | |
pbar.update(num_events) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment