Last active
December 30, 2015 07:19
-
-
Save chatcannon/7795048 to your computer and use it in GitHub Desktop.
Parser for the SPEC file format used for x-ray scattering data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""SPEC.py - code for reading files in the SPEC data format | |
Copyright (c) Christopher Kerr 2013 | |
You may use and redistribute this file under the terms of the | |
GNU General Public Licence (GPL), version 3 or, at your option, | |
any later version.""" | |
__all__ = ['SPECfile'] | |
import re | |
import itertools | |
import time | |
import sys | |
from warnings import warn | |
import numpy as np | |
def takewhile_dropwhile(pred, seq): | |
"""Like takewhile and dropwhile from itertools but returns both sequences | |
Reads all the takewhile elements into a list but leaves dropwhile as an iterator""" | |
takewhile = [] | |
for elem in seq: | |
if pred(elem): | |
takewhile.append(elem) | |
else: | |
# use itertools.chain to fake pushing the element back on the front of seq | |
return takewhile, itertools.chain([elem], seq) | |
# no elements fail the predicate | |
return takewhile, [] | |
not_s_line = lambda x: not x.startswith("#S ") | |
def is_empty_list(x): | |
if isinstance(x, list): | |
return not bool(x) | |
else: | |
return False | |
def filter_comments(lines): | |
filtered = [] | |
comments = [] | |
Ublocks = dict() | |
in_U_block = False | |
current_U_name = None | |
for line in lines: | |
if in_U_block: | |
current_U_match = re.match('#U%s(\s+.*)$' % re.escape(current_U_name), line) | |
if current_U_match: | |
Ublocks[current_U_name].append(current_U_match.group(1)) | |
continue | |
else: | |
in_U_block = False | |
current_U_name = None | |
current_U_prefix = None | |
## No 'else' here because we want to catch one U block immediately following another | |
U_match = re.match("#U(\S+)(\s+.*)$", line) | |
if U_match: | |
in_U_block = True | |
current_U_name = U_match.group(1) | |
if current_U_name in Ublocks: | |
if current_U_name == 'O' and U_match.group(2).strip() == '*** END OF DATA ***': | |
continue # Don't throw an error when seeing "END OF DATA" | |
else: | |
raise ValueError("""Duplicate #U block: %s | |
Current line: '%s' | |
Previous block: '''%s'''""" % (current_U_name, line.strip(), | |
'\n'.join(Ublocks[current_U_name]))) | |
Ublocks[current_U_name] = [U_match.group(2)] | |
else: | |
if line.startswith('#C'): | |
comments.append(line[2:].strip()) | |
else: | |
filtered.append(line) | |
## Remove shared indentation from blocks | |
for name, block in Ublocks.items(): | |
min_indent = min(map(lambda x: len(x) - len(x.lstrip()), block)) | |
Ublocks[name] = '\n'.join(map(lambda x: x[min_indent:].rstrip(), block)) | |
return filtered, comments, Ublocks | |
def spec_strptime(datestring): | |
# Normalise spaces | |
datestring = re.sub('\s+', ' ', datestring.strip()) | |
if sys.version_info.major >= 3: | |
# time.strptime is useless with timezones | |
datestring = re.sub(r'\bCET\b', '+0100', datestring) | |
datestring = re.sub(r'\bCEST\b', '+0200', datestring) | |
else: | |
# time.strptime can't handle %z | |
datestring = re.sub(r"\s+[A-Z]+\b", '', datestring) | |
try: | |
return time.strptime(datestring, "%a %b %d %H:%M:%S %Y") | |
except ValueError: | |
try: | |
return time.strptime(datestring, "%a %b %d %H:%M:%S %Y %z") | |
except ValueError: | |
try: | |
return time.strptime(datestring, "%a %b %d %H:%M:%S %z %Y") | |
except ValueError: | |
warn("Ignoring timezone from date '%s'" % datestring) | |
return time.strptime(datestring, "%a %b %d %H:%M:%S %Y") | |
class SPECblock: | |
"""A data block from a file in the SPEC format""" | |
def __init__(self, s_line, s_block_body): | |
s_match = re.match("#S +(\d+) +(.+)$", s_line) | |
self.scan_num = int(s_match.group(1)) | |
self.name = s_match.group(2).strip() | |
filtered, self.comments, self.Ublocks = filter_comments(s_block_body) | |
n_match = re.match("#N +(\d+)$", filtered[0]) | |
N = int(n_match.group(1)) | |
l_match = re.match("#L (.+)$", filtered[1]) | |
self.headers = re.split(' +', l_match.group(1).strip()) | |
if len(self.headers) != N: | |
raise ValueError("""Number of #L headers does not equal number from #N | |
Number expected: %d | |
Header: '%s'""" % (N, self.headers)) | |
self.data = np.loadtxt(filtered[2:], ndmin=2) | |
if self.data.shape[1] != N: | |
raise ValueError("""Number of data columns does not equal number from #N | |
Number expected: %d | |
Data shape: %s""" % (N, tuple(self.data.shape))) | |
def __getitem__(self, key): | |
if isinstance(key, str): | |
if key in self.headers: | |
return self.data[:, self.headers.index(key)] | |
else: | |
raise KeyError("%s is not in the headers" % key) | |
else: | |
return self.data[key] | |
class SPECfile: | |
"""A file in the SPEC format""" | |
def __init__(self, file_or_path): | |
if isinstance(file_or_path, str): | |
file_or_path = open(file_or_path, 'r') | |
fblock, rest = takewhile_dropwhile(not_s_line, file_or_path) | |
namematch = re.match("#F +(.+)$", fblock[0]) | |
self.name = namematch.group(1).strip() | |
hdrs, self.comments, self.Ublocks = filter_comments(fblock[1:]) | |
self.date = None | |
for hdr in hdrs: | |
if hdr.startswith('#D '): | |
self.date = spec_strptime(hdr[3:]) | |
elif hdr.strip() in ('', '#'): | |
pass # Ignore blank lines | |
else: | |
warn("Unrecognised header: '%s'" % hdr) | |
self.blocks = [] | |
self.blocks_by_name = dict() | |
self.blocks_by_number = dict() | |
while not is_empty_list(rest): | |
s_line = next(rest) | |
sblock_body, rest = takewhile_dropwhile(not_s_line, rest) | |
sblock = SPECblock(s_line, sblock_body) | |
self.blocks.append(sblock) | |
if sblock.name in self.blocks_by_name: | |
raise ValueError("Duplicate S block name: %s" % sblock.name) | |
else: | |
self.blocks_by_name[sblock.name] = sblock | |
if sblock.scan_num in self.blocks_by_number: | |
self.blocks_by_number[sblock.scan_num].append(sblock) | |
else: | |
self.blocks_by_number[sblock.scan_num] = [sblock] | |
def __getitem__(self, key): | |
if isinstance(key, str): | |
return self.blocks_by_name[key] | |
elif isinstance(key, int): | |
return self.blocks_by_number[key] | |
else: | |
raise KeyError("Key must be either an int or a string") | |
def __iter__(self): | |
return iter(self.blocks) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment