Created
November 18, 2012 16:06
-
-
Save niwinz/4106007 to your computer and use it in GitHub Desktop.
HL7 Prototype
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals, print_function | |
import io | |
SEGMENT_SEPARATOR = b'\r' | |
SECTION_SEPARATOR = b'^' | |
FIELD_SEPARATOR = b'|' | |
class Field(object): | |
def __init__(self, value, description=None, index=None): | |
self._raw_value = value | |
self._value = self.parse(value) | |
self._description = description | |
self._index = index | |
def parse(self, value): | |
return value | |
def __len__(self): | |
return len(self._raw_value) | |
def __bytes__(self): | |
return self.value | |
def __str__(self): | |
return self.value.decode("utf-8") | |
@property | |
def value(self): | |
return self._value | |
@property | |
def description(self): | |
return self._description | |
@property | |
def index(self): | |
return self._index | |
class STField(Field): | |
""" | |
String representation on hl7 protocol. | |
""" | |
def __bytes__(self): | |
return self.value | |
def __str__(self): | |
return self.value.decode('utf-8') | |
class IDField(Field): | |
pass | |
class ISField(Field): | |
pass | |
class CEField(Field): | |
pass | |
class SIField(Field): | |
pass | |
class CXField(Field): | |
pass | |
class XPNField(Field): | |
pass | |
class TSField(Field): | |
pass | |
class XADField(Field): | |
_definition = [ | |
(0, STField, "Street address"), | |
(1, STField, "Other destination"), | |
(2, STField, "City"), | |
(3, STField, "State or Province"), | |
(4, STField, "Zip or postal code"), | |
(5, IDField, "Country"), | |
(6, IDField, "Address type"), | |
(7, STField, "Other geographical designation"), | |
(8, ISField, "country/parish code"), | |
(9, ISField, "census tract"), | |
(10, IDField, "address representation code"), | |
] | |
def parse(self, value): | |
parsed_value = [] | |
for _def, _data in zip(self._definition, value.split(SECTION_SEPARATOR)): | |
cls_index, cls, cls_desc = _def | |
parsed_value.append(cls(_data, cls_desc, cls_index)) | |
return parsed_value | |
@property | |
def value(self): | |
return b" ".join(bytes(x) for x in self._value) | |
class Segment(object): | |
_fields = None | |
_identifier = None | |
_definition = None | |
def __init__(self, data, message): | |
self._message = message | |
self._data = data.split(FIELD_SEPARATOR) | |
self._fields = self.parse(self._data) | |
def parse(self, data): | |
raise NotImplementedError | |
@property | |
def fields(self): | |
return self._fields | |
class PIDSegment(Segment): | |
_identifier = "PID" | |
_definition = [ | |
# (seq, len, field, opt, desc) | |
(0, 4, SIField, "O", "setid - pid"), | |
(1, 20, CXField, "B", "patient id"), | |
(2, 20, CXField, "R", "patient identifier list"), | |
(3, 20, CXField, "B", "alternate patient id"), | |
(4, 48, XPNField, "R", "patient name"), | |
(5, 48, XPNField, "O", "mothers maiden name"), | |
(6, 26, TSField, "O", "date/time of birth"), | |
(7, 1, ISField, "O", "sex"), | |
(8, 48, XPNField, "O", "patient alias"), | |
(9, 80, CEField, "O", "race"), | |
(10, 106, XADField, "O", "patient address"), | |
# TODO: incomplete | |
] | |
def parse(self, data): | |
parsed_data = [] | |
for _def, _data in zip(self._definition, self._data[1:]): | |
i, size, cls, opt, desc = _def | |
#if len(_data) > size: | |
# import pdb; pdb.set_trace() | |
# raise RuntimeError("Field '{0}' with incorect size.".format(i)) | |
parsed_data.append((i, cls(_data, desc, i))) | |
return parsed_data | |
class Header(object): | |
def __init__(self, data, message): | |
self._message = message | |
self._data = data.split(FIELD_SEPARATOR) | |
@property | |
def sending_application(self): | |
return self._data[2] | |
@property | |
def sending_facility(self): | |
return self._data[3] | |
@property | |
def receiving_application(self): | |
return self._data[4] | |
@property | |
def receiving_facility(self): | |
return self._data[5] | |
@property | |
def timestamp(self): | |
return self._data[6] | |
@property | |
def message_type(self): | |
return self._data[8] | |
@property | |
def control_id(self): | |
return self._data[9] | |
@property | |
def processing_id(self): | |
return self._data[10] | |
@property | |
def version(self): | |
return self._data[11] | |
class Message(object): | |
header_cls = Header | |
_header = None | |
_segments = None | |
def __init__(self, message): | |
if not isinstance(message, bytes): | |
message = message.encode('utf-8') | |
self._message = message.split(SEGMENT_SEPARATOR) | |
self._parse_header() | |
def _parse_header(self): | |
return self.header_cls(self._message[0], self) | |
@property | |
def header(self): | |
if self._header is None: | |
self._header = self._parse_header() | |
return self._header | |
def parse(data=None, filename=None, cls=Message): | |
if filename is not None: | |
with io.open(filename, "rb") as f: | |
return cls(f.read()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import unittest | |
import pdb | |
import hl7 | |
class TestPIDSegment(unittest.TestCase): | |
def setUp(self): | |
self.data = ("PID|||56782445^^^UAReg^PI||KLEINSAMPLE^BARRY^Q^JR||19620910|M||2028-9^^HL70005^RA99113^^XYZ" | |
"|260 GOODWIN CREST DRIVE^^BIRMINGHAM^AL^35 209^^M~NICKELL’S PICKLES^10000 W 100TH AVE^BIRMIN" | |
"GHAM^AL^35200^^O |||||||0105I30001^^^99DEF^AN") | |
def test_parse_01(self): | |
segment = hl7.PIDSegment(self.data.encode('utf-8'), None) | |
# Sex field | |
index, field = segment.fields[7] | |
self.assertEqual(field.description, "sex") | |
self.assertEqual(field.value, b"M") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment