This is code to consume processed data from XPD back into document model
Last active
April 2, 2020 19:37
-
-
Save tacaswell/257d0d712a87e25af8f1b33b3407c84e to your computer and use it in GitHub Desktop.
ingest xpd chi files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sources: | |
xpd_auto_202003_mongo: | |
driver: bluesky-mongo-normalized-catalog | |
args: | |
metadatastore_db: mongodb://localhost/xpd_auto_202003_mds | |
asset_registry_db: mongodb://localhost/xpd_auto_202003_ar | |
xpd_auto_202003_msgpack: | |
driver: bluesky-msgpack-catalog | |
args: | |
paths: | |
- "/SOMEPATH/export/*.msgpack" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
import zipfile | |
import pandas | |
import datetime | |
import numpy as np | |
from event_model import compose_run | |
from collections import defaultdict | |
from databroker import catalog | |
root_path = Path("/SOMEPATH/2020-03_xpd_autonomous/") | |
chi_zip = root_path / "Beamtime Reduction in Q space-20200331T172054Z-001.zip" | |
excel_path = root_path / "20191125_XPD.xlsx" | |
def parse_name(inp): | |
# special case the empty sample position | |
if "empty" in inp: | |
return None | |
# the sample name can have 3 or 4 _ | |
try: | |
comp, temp, time, batch, coord_number = inp.split("_") | |
except ValueError: | |
try: | |
comp, pristene, batch, coord_number = inp.split("_") | |
# if the sample is "pristene" default to room temperature and 0 cooking | |
if pristene == "Pristine": | |
temp = "25C" | |
time = "0min" | |
else: | |
return None | |
except ValueError: | |
return None | |
# TODO check the post fixes are correct | |
time = float(time.replace("p", ".")[:-3]) | |
temp = float(temp[:-1]) | |
comp = {comp[:2]: int(comp[2:4]), comp[4:6]: int(comp[6:])} | |
return { | |
**comp, | |
"temp": temp, | |
"anneal_time": time, | |
"batch": tuple(map(int, batch.split("-"))), | |
"position": tuple(map(int, coord_number.split("-"))), | |
} | |
df = pandas.read_excel(excel_path) | |
sample_names = set(df["sample"]) | |
data = defaultdict( | |
lambda: { | |
"start": None, | |
"descriptors": defaultdict(list), | |
"event_pages": defaultdict(list), | |
"stop": None, | |
} | |
) | |
with zipfile.ZipFile(chi_zip) as zipin: | |
for n in zipin.namelist(): | |
n = Path(n) | |
_, fname = n.parts | |
for sn in sample_names: | |
if fname.startswith(sn): | |
break | |
else: | |
raise ValueError | |
fname = fname[len(sn) + 1 :] | |
date_str, _, fname = fname.partition("_") | |
dt = datetime.datetime.strptime(date_str, "%Y%m%d-%H%M%S") | |
ts = dt.timestamp() | |
uid, _, fname = fname.partition("_") | |
num, _, fname = fname.partition("_") | |
res, _, ext = fname.partition(".") | |
md = { | |
"source_uid": uid, | |
"iso_time": dt.isoformat(), | |
"result_type": res, | |
"result_number": int(num), | |
"original_path": str(n), | |
"sample_name": sn, | |
**parse_name(sn), | |
} | |
fin = iter(zipin.read(str(n)).split(b"\n")) | |
# grab first line of the header which includes in interesting looking path | |
md["chi_header"] = next(fin).decode("ascii") | |
# skip 5 lines of comments for humans | |
for j in range(5): | |
next(fin) | |
_, _, num = next(fin).partition(b":") | |
num = int(num) | |
md["num_bins"] = num | |
Q = np.zeros(num) | |
I = np.zeros(num) | |
# skip line of comments | |
next(fin) | |
for k, ln in enumerate(fin): | |
if ln.strip(): | |
Q[k], I[k] = map(float, ln.split(b" ")) | |
assert k == num | |
# first, compose the run start and get the rest of the factories | |
run_bundle = compose_run(time=ts, metadata=md) | |
# stash the start document for later, could also just serialize | |
data[uid]["start"] = run_bundle.start_doc | |
# create the descriptor and factories | |
desc_bundle = run_bundle.compose_descriptor( | |
name="primary", | |
data_keys={ | |
"Q": { | |
"dtype": "number", | |
"shape": [], | |
"source": "compute", | |
"units": "angstrom", | |
}, | |
"I": { | |
"dtype": "number", | |
"shape": [], | |
"source": "compute", | |
"units": "arbitrarily", | |
}, | |
}, | |
) | |
# stash the descriptor for later use | |
desc = desc_bundle.descriptor_doc | |
data[uid]["descriptors"][desc["name"]].append(desc) | |
# construct and event page | |
evnt_page = desc_bundle.compose_event_page( | |
data={"Q": Q, "I": I}, | |
# we might make timestamps optional in the future | |
timestamps={"Q": np.ones_like(Q) * ts, "I": np.ones_like(I) * ts}, | |
seq_num=list(range(len(Q))), | |
time=np.ones_like(Q) * ts, | |
# there is a PR to fix this in event-model the problem is | |
# that it is that jsonschema is not identifying that numpy | |
# arrays are "array" from a validation point of view 🤦 | |
validate=False, | |
) | |
data[uid]["event_pages"][desc["uid"]].append(evnt_page) | |
data[uid]["stop"] = run_bundle.compose_stop() | |
# grab a reference to the catalog (backed by msgpack files) | |
db = catalog['xpd_auto_202003_msgpack'] | |
# short-cut to get the correct suitcase configured to write where | |
# the catalog reads from | |
serialize = db.v1.insert | |
# loop through the dictionary we built above | |
for v in data.values(): | |
# insert the start | |
serialize('start', v['start']) | |
# for each stream insert the descriptors | |
# there can be more than one stream | |
for stream, descs in v['descriptors'].items(): | |
# and each stream can have more than one descriptor | |
for d in descs: | |
serialize('descriptor', d) | |
# loop through and insert all of the event pages | |
for k, event_pages in v['event_pages'].items(): | |
for page in event_pages: | |
serialize('event_page', page) | |
# insert the stop document | |
serialize('stop', v['stop']) | |
# # get all of the uids | |
# uids = list(db.search({})) | |
# | |
# # pull _all_ of the RunCatalogs | |
# data = [db[uid] for uid in uids] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment