Created
June 12, 2018 15:07
-
-
Save tantale/2ed1592d1dc65fed19726ddf9936d670 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
""" | |
Qronos Attributes encoder/decoder | |
================================= | |
:Created on: 2018-06-12 | |
:Author: Laurent LAPORTE <[email protected]> | |
""" | |
import datetime | |
import enum | |
import re | |
import six | |
from six.moves.urllib.parse import quote_plus | |
from six.moves.urllib.parse import unquote_plus | |
#: "nil" is a special value used by Qronos to represent empty or missing attributes. | |
NIL = u"nil" | |
#: "ARRAY#|#" is a marker used to represent a PHP array. | |
#: In Python, we used it for ``list``, ``tuple`` and ``dict`` objects. | |
ARRAY = u"ARRAY#|#" | |
#: Standard attributes which doesn't need to be URL-encoded. | |
STANDARD_ATTRS = ['APP', 'ATTRIBUT', 'DATE', 'DATEL', 'DE', 'DEBUG', 'DELHISTORY', 'DOC', 'ETAT', 'FIC', | |
'FICHIER', 'ID', 'IMA', 'M', 'META', 'MODE', 'NBJ', 'NBP', 'NOM', 'OPE', 'OPEGEO', 'PRI', | |
'PROCESS', 'REP', 'REQ', 'SER', 'SORTIE', 'SYNC', 'TAILLE', 'TYPE', 'USER', 'VERS'] | |
class State(enum.Enum): | |
PENDING = u'ATTENTE' | |
IN_PROGRESS = u'EN_COURS' | |
SUBCONTRACTED = u'SOUS_TRAITE' | |
DELEGATED = u'DELEGUE' | |
FAILURE = u'ECHEC' | |
FINISHED = u'FINI' | |
def parse_attrs(attrs): | |
""" | |
Parse the attributes extracted from a Qronos response message. | |
:type attrs: str | |
:param attrs: String representing an attribute. | |
:rtype: dict[str, str] | |
:return: Parsed attributes. | |
""" | |
if attrs is None: | |
return {} | |
att = re.compile(r'(\w+)=("[^"]*"|[^ ]*)', re.S | re.I) | |
d_att = {} | |
for i in att.finditer(attrs): | |
d_att[i.group(1)] = i.group(2).strip('"') | |
return d_att | |
class QronosEncoder(object): | |
encoding = "CP1252" | |
def encode(self, obj): | |
""" | |
Encode a Python data structure into a string which Qronos can handle. | |
""" | |
if obj is None: | |
return NIL | |
elif isinstance(obj, bool): | |
return six.text_type(obj) | |
elif isinstance(obj, six.text_type): | |
return obj if obj else NIL | |
elif isinstance(obj, six.binary_type): | |
return obj.decode(self.encoding) if obj else NIL | |
elif isinstance(obj, (six.integer_types, float)): | |
return six.text_type(obj) | |
elif isinstance(obj, (list, tuple)): | |
# support of PHP arrays in Igloo | |
return ARRAY + u"#|#".join(u"{i}>##>{v}".format(i=i, v=v) for i, v in enumerate(obj)) | |
elif isinstance(obj, dict): | |
# support of PHP arrays in Igloo | |
return ARRAY + u"#|#".join(u"{k}>##>{v}".format(k=k, v=v) for k, v in six.iteritems(obj)) | |
elif isinstance(obj, datetime.datetime): | |
return six.text_type(obj.strftime("%d/%m/%Y %H:%M:%S")) | |
elif isinstance(obj, datetime.date): | |
# WARNING: ``datetime.datetime`` is a subclass of ``datetime.date`` | |
# so, we must have this ``elif`` after the one for ``datetime.datetime`` | |
return six.text_type(obj.strftime("%d/%m/%Y")) | |
else: | |
raise TypeError("cannot encode object of type: " + repr(obj)) | |
def encode_compact_date(self, obj): | |
if isinstance(obj, datetime.datetime): | |
return six.text_type(obj.strftime("%d%m%Y%H%M%S")) | |
elif isinstance(obj, datetime.date): | |
return six.text_type(obj.strftime("%d%m%Y")) | |
else: | |
return self.encode(obj) | |
def encode_french_date(self, obj): | |
if isinstance(obj, datetime.datetime): | |
return six.text_type(obj.strftime("%d/%m/%Y %H:%M:%S")) | |
elif isinstance(obj, datetime.date): | |
return six.text_type(obj.strftime("%d/%m/%Y")) | |
else: | |
return self.encode(obj) | |
def encode_oui_non(self, obj): | |
if obj in (True, False): | |
return {True: u"O", False: u"N"}[obj] | |
else: | |
return self.encode(obj) | |
def encode_state(self, obj): | |
if isinstance(obj, State): | |
return obj.value | |
else: | |
return self.encode(obj) | |
def encode_int(self, obj): | |
if isinstance(obj, six.integer_types): | |
return six.text_type(obj) | |
else: | |
return self.encode(obj) | |
encoders = { | |
'DATE': encode_compact_date, | |
'DATEL': encode_french_date, | |
'DELHISTORY': encode_oui_non, | |
'ETAT': encode_state, | |
'NBJ': encode_int, | |
'NBP': encode_int, | |
'PRI': encode_int, | |
'TAILLE': encode_int, | |
} | |
def encode_attrs(self, attrs): | |
""" | |
Encode the attributes to send to Qronos. | |
:type attrs: dict[str, object] | |
:param attrs: Qronos attributes. | |
:return: the dictionary of encoded attributes. Values are byte strings. | |
:rtype: dict[str, bytes] | |
""" | |
ret = {} | |
encoders = self.encoders | |
for key, obj in six.iteritems(attrs): | |
key = key.upper() | |
if key in encoders: | |
ret[key] = encoders[key](self, obj) | |
else: | |
encoded = self.encode(obj) | |
if key in STANDARD_ATTRS: | |
ret[key] = encoded | |
else: | |
ret[key] = quote_plus(encoded, encoding=self.encoding) | |
return ret | |
# noinspection PyMethodMayBeStatic | |
def decode(self, text): | |
if text is None or text == NIL: | |
return None | |
elif text.startswith(ARRAY): | |
text = text[len(ARRAY):] | |
pairs = [kv.split(u'>##>', 1) for kv in text.split(u"#|#")] | |
if [pair[0] for pair in pairs] == [six.text_type(i) for i in six.moves.xrange(len(pairs))]: | |
# The value is an array with numeric indexes (translated into a Python list) | |
return [pair[1] for pair in pairs] | |
else: | |
# The value is an array with string indexes (translated into a Python dict) | |
return dict(pairs) | |
elif isinstance(text, six.text_type): | |
return text | |
else: | |
raise TypeError("cannot decode object of type: " + repr(text)) | |
def decode_date(self, text): | |
date_formats = ["%d/%m/%Y", "%Y-%m-%d", "%d%m%Y"] | |
for fmt in date_formats: | |
try: | |
return datetime.datetime.strptime(text, fmt).date() | |
except ValueError: | |
pass | |
datetime_formats = ["%d/%m/%Y %H:%M:%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%d%m%Y%H%M%S"] | |
for fmt in datetime_formats: | |
try: | |
return datetime.datetime.strptime(text, fmt) | |
except ValueError: | |
pass | |
return self.decode(text) | |
def decode_oui_non(self, text): | |
if text in u"ON": | |
return {u"O": True, u"N": False}[text] | |
else: | |
return self.decode(text) | |
def decode_state(self, text): | |
for state in State: | |
if state.value == text: | |
return state | |
return self.decode(text) | |
def decode_int(self, text): | |
try: | |
return int(text) | |
except ValueError: | |
return self.decode(text) | |
decoders = { | |
'DATE': decode_date, | |
'DATEL': decode_date, | |
'DELHISTORY': decode_oui_non, | |
'ETAT': decode_state, | |
'NBJ': decode_int, | |
'NBP': decode_int, | |
'PRI': decode_int, | |
'TAILLE': decode_int, | |
} | |
def decode_attrs(self, attrs): | |
""" | |
Decode the attributes received from Qronos. | |
:type attrs: dict[str, bytes] | |
:param attrs: attributes parsed from a Qronos response. | |
:rtype: dict[str, object] | |
:return: Python dictionary containing the decoded attributes. | |
""" | |
ret = {} | |
decoders = self.decoders | |
for key, value in six.iteritems(attrs): | |
key = key.upper() | |
if isinstance(value, six.binary_type): | |
value = value.decode() | |
decoded = unquote_plus(value, encoding=self.encoding) | |
if key in decoders: | |
ret[key] = decoders[key](self, decoded) | |
elif key == "VAL": | |
val_attrs = parse_attrs(decoded) | |
ret.update({k: self.decode(t) for k, t in six.iteritems(val_attrs)}) | |
else: | |
ret[key] = self.decode(decoded) | |
return ret |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment