Skip to content

Instantly share code, notes, and snippets.

@ruffsl
Last active July 28, 2018 01:03
Show Gist options
  • Save ruffsl/251d10ec2a815e0d6c9c5a8002ba3036 to your computer and use it in GitHub Desktop.
Save ruffsl/251d10ec2a815e0d6c9c5a8002ba3036 to your computer and use it in GitHub Desktop.
Using XML schema to find and enable substitution of missing elements
import xml.etree.ElementTree as ElementTree
from xml.dom import minidom
def tidy_xml(element):
subiter = ElementTree.ElementTree(element).getiterator()
for x in subiter:
if len(x):
if x.text:
x.text = x.text.strip()
if x.tail:
x.tail = x.tail.strip()
return element
def pretty_xml(element):
xmlstr = ElementTree.tostring(element, encoding='unicode', method='xml')
xmlstr = minidom.parseString(xmlstr).toprettyxml(indent=' ', newl='\n', encoding='utf-8')
return xmlstr.decode('utf-8')
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1.0.1\n"
]
}
],
"source": [
"import xmlschema\n",
"print(xmlschema.__version__)\n",
"\n",
"# For adding missing elements\n",
"from xml.etree import ElementTree\n",
"\n",
"# For catching erros\n",
"from xmlschema import XMLSchemaValidationError\n",
"\n",
"# For loading\n",
"from xmlschema.resources import load_xml_resource\n",
"from xmlschema.etree import is_etree_element\n",
"\n",
"# For Printing\n",
"from utils import tidy_xml, pretty_xml"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The schema for certs"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"xsd_string = \"\"\"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n",
"\n",
"<xs:schema xmlns:xs=\"http://www.w3.org/2001/XMLSchema\"\n",
" elementFormDefault=\"qualified\"\n",
" attributeFormDefault=\"unqualified\">\n",
"\n",
" <xs:element name=\"profile\" type=\"AuthoritiesNode\" />\n",
" <xs:complexType name=\"AuthoritiesNode\">\n",
" <xs:sequence>\n",
" <xs:element name=\"authorities\" type=\"Authorities\" />\n",
" </xs:sequence>\n",
" </xs:complexType>\n",
"\n",
" <xs:complexType name=\"Authorities\">\n",
" <xs:sequence maxOccurs=\"unbounded\">\n",
" <xs:element name=\"authority\" type=\"Authority\" />\n",
" </xs:sequence>\n",
" </xs:complexType>\n",
"\n",
" <xs:complexType name=\"Authority\">\n",
" <xs:sequence minOccurs=\"1\" maxOccurs=\"1\">\n",
" <xs:element name=\"cert\" minOccurs=\"1\" type=\"CertType\" />\n",
" </xs:sequence>\n",
" <xs:attribute name=\"name\" type=\"xs:string\" use=\"required\" />\n",
" </xs:complexType>\n",
"\n",
" <xs:complexType name=\"CertType\">\n",
" <xs:sequence>\n",
" <xs:element name=\"subject_name\" type=\"xs:string\" />\n",
" <xs:element name=\"validity\" type=\"Validity\" />\n",
" <xs:element name=\"serial_number\" type=\"xs:integer\" />\n",
" <xs:element name=\"issuer_name\" type=\"xs:string\" />\n",
" </xs:sequence>\n",
" </xs:complexType>\n",
"\n",
" <xs:complexType name=\"Validity\">\n",
" <xs:sequence>\n",
" <xs:element name=\"not_before\" type=\"xs:dateTime\" />\n",
" <xs:element name=\"not_after\" type=\"xs:dateTime\" />\n",
" </xs:sequence>\n",
" </xs:complexType>\n",
"\n",
"</xs:schema>\n",
"\"\"\"\n",
"xsd_schema = xmlschema.XMLSchema(xsd_string)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A fully valid cert authority"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"xml_string_1 = \"\"\"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n",
"<profile>\n",
" <authorities>\n",
" <authority name=\"root_ca\">\n",
" <cert>\n",
" <subject_name>CN=root_ca</subject_name>\n",
" <validity>\n",
" <not_before>2013-06-01T13:00:00</not_before>\n",
" <not_after>2023-06-01T13:00:00</not_after>\n",
" </validity>\n",
" <serial_number>0</serial_number>\n",
" <issuer_name></issuer_name>\n",
" </cert>\n",
" </authority>\n",
" </authorities>\n",
"</profile>\n",
"\"\"\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A partially valid authority listing. Valid authority comeneted out to shorten print out, but multple valid or partially elements with missing elements could be scattere throughout the xml tree."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"xml_string_2 = \"\"\"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n",
"<profile>\n",
" <authorities>\n",
" <authority name=\"identity_ca\">\n",
" <cert>\n",
" <serial_number>0</serial_number>\n",
" </cert>\n",
" </authority>\n",
" </authorities>\n",
"</profile>\n",
"\"\"\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A partially valid authority listing now omitting even the `cert` element as well."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"xml_string_3 = \"\"\"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n",
"<profile>\n",
" <authorities>\n",
" <authority name=\"identity_ca\">\n",
" </authority>\n",
" </authorities>\n",
"</profile>\n",
"\"\"\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Default tree who's root element contains a flat lookup space for missing elements to pick up from."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"xml_string_default_a = \"\"\"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n",
"<default>\n",
" <serial_number>1</serial_number>\n",
" <issuer_name>root_ca</issuer_name>\n",
" <validity>\n",
" <not_before>2013-06-01T13:00:00</not_before>\n",
" </validity>\n",
" <subject_name>CN={xml_string_default}</subject_name>\n",
" <not_after>2023-06-01T13:00:00</not_after>\n",
"</default>\n",
"\"\"\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This default is slightly diffrent, as the complext element `validity` is entirely committed. This will show the recursive populating fallback for missing elements. "
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"xml_string_default_b = \"\"\"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n",
"<default>\n",
" <serial_number>2</serial_number>\n",
" <issuer_name>root_ca</issuer_name>\n",
" <not_before>2013-06-01T13:00:00</not_before>\n",
" <subject_name>CN={xml_string_default}</subject_name>\n",
" <not_after>2023-06-01T13:00:00</not_after>\n",
"</default>\n",
"\"\"\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We see the first xml string is valid"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"None\n"
]
}
],
"source": [
"print(xsd_schema.validate(source=xml_string_1, use_defaults=False))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"But we see the second string is missing some child elements for the complext cert type."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"ename": "XMLSchemaChildrenValidationError",
"evalue": "failed validating <Element 'cert' at 0x7f0c690820e8> with XsdElement(name='subject_name').\n\nReason: The child n.1 of element 'cert' has a unexpected tag 'serial_number'. Tag 'subject_name' expected.\n\nSchema:\n\n <xs:element xmlns:xs=\"http://www.w3.org/2001/XMLSchema\" name=\"subject_name\" type=\"xs:string\" />\n\nInstance:\n\n <cert>\n <serial_number>0</serial_number>\n </cert>\n",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mXMLSchemaChildrenValidationError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-9-b1a1d5f58ea6>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mprint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mxsd_schema\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mvalidate\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msource\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mxml_string_2\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0muse_defaults\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mFalse\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m/usr/local/lib/python3.6/dist-packages/xmlschema/validators/xsdbase.py\u001b[0m in \u001b[0;36mvalidate\u001b[0;34m(self, source, use_defaults)\u001b[0m\n\u001b[1;32m 484\u001b[0m \"\"\"\n\u001b[1;32m 485\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0merror\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0miter_errors\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msource\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0muse_defaults\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0muse_defaults\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 486\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0merror\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 487\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 488\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0miter_errors\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msource\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mpath\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0muse_defaults\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mXMLSchemaChildrenValidationError\u001b[0m: failed validating <Element 'cert' at 0x7f0c690820e8> with XsdElement(name='subject_name').\n\nReason: The child n.1 of element 'cert' has a unexpected tag 'serial_number'. Tag 'subject_name' expected.\n\nSchema:\n\n <xs:element xmlns:xs=\"http://www.w3.org/2001/XMLSchema\" name=\"subject_name\" type=\"xs:string\" />\n\nInstance:\n\n <cert>\n <serial_number>0</serial_number>\n </cert>\n"
]
}
],
"source": [
"print(xsd_schema.validate(source=xml_string_2, use_defaults=False))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Functions to print and load xml"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"def print_xml(elemnt):\n",
" tidy_elemnt = tidy_xml(elemnt)\n",
" pretty_elemnt = pretty_xml(tidy_elemnt)\n",
" print('')\n",
" print(pretty_elemnt)\n",
"\n",
"def load_xml(xml_document):\n",
" try:\n",
" xml_root = xml_document.getroot()\n",
" except (AttributeError, TypeError):\n",
" if is_etree_element(xml_document):\n",
" xml_root = xml_document\n",
" else:\n",
" xml_root = load_xml_resource(xml_document)\n",
" else:\n",
" if not is_etree_element(xml_root):\n",
" raise XMLSchemaTypeError(\n",
" \"wrong type %r for 'xml_document' argument.\" % type(xml_document)\n",
" )\n",
" return xml_root"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Small plance holder to collect chunks"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"chunks = []"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"main function and preprocessor to iteratively find missing elements in the xml data and substatute them for those first found in the defaul space. While looping until completion allows for defult elements in the default space to be recusivly constructed, i.e. a complex defult element could be partialy complete as to allow substitutions for elsewhare in the default space."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"def preprocess_default(xsd_schema, data, default_data, path=None, use_defaults=False):\n",
" data = load_xml(data)\n",
" default_data = load_xml(default_data)\n",
" \n",
" while True:\n",
" missing_errors = []\n",
"\n",
" iter_errors = default_preprocessor(\n",
" xsd_schema,\n",
" data,\n",
" default_data,\n",
" path,\n",
" use_defaults)\n",
"\n",
" for missing_error in iter_errors:\n",
" missing_errors.append(missing_error)\n",
"# raise missing_error\n",
"\n",
" if not missing_errors:\n",
" break\n",
" \n",
" return data\n",
"\n",
"def default_preprocessor(xsd_schema, xml_document, xml_document_default, path=None, use_defaults=False):\n",
" iter_decoder = xsd_schema.iter_decode(\n",
" source=xml_document,\n",
" path=path,\n",
" use_defaults=use_defaults,\n",
" validation='lax')\n",
" \n",
" for chunk in iter_decoder:\n",
" if isinstance(chunk, XMLSchemaValidationError):\n",
" name = chunk.validator.name\n",
" if chunk.reason.startswith(\"The child n.\") or chunk.reason.startswith(\"The content of element '\"):\n",
" expecteds = chunk.expected\n",
" if not isinstance(expecteds, (list, tuple)):\n",
" expecteds = [expecteds]\n",
" for i, expected in enumerate(expecteds):\n",
" index = chunk.index + i\n",
" default_elem = xml_document_default.find(expected)\n",
" if default_elem is not None:\n",
" print('Default element found!')\n",
" print(' ', 'chunk.reason: ', chunk.reason)\n",
" print(' ', 'index: ', index)\n",
" print(' ', 'expected: ', expected)\n",
" chunk.elem.insert(index, default_elem)\n",
" chunks.append(chunk)\n",
" yield chunk\n",
" return\n",
" else:\n",
"# missing_elem = ElementTree.Element(expected)\n",
" missing_elem = chunk.elem.__class__(expected)\n",
" print('No default found!')\n",
" print(' ', 'chunk.reason: ', chunk.reason)\n",
" print(' ', 'index: ', index)\n",
" print(' ', 'expected: ', expected)\n",
" chunk.elem.insert(index, missing_elem)\n",
" yield chunk\n",
" return\n",
" else:\n",
" raise chunk"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"No substutions needed for first xml string"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"<?xml version=\"1.0\" encoding=\"utf-8\"?>\n",
"<profile>\n",
" <authorities>\n",
" <authority name=\"root_ca\">\n",
" <cert>\n",
" <subject_name>CN=root_ca</subject_name>\n",
" <validity>\n",
" <not_before>2013-06-01T13:00:00</not_before>\n",
" <not_after>2023-06-01T13:00:00</not_after>\n",
" </validity>\n",
" <serial_number>0</serial_number>\n",
" <issuer_name/>\n",
" </cert>\n",
" </authority>\n",
" </authorities>\n",
"</profile>\n",
"\n"
]
}
],
"source": [
"xml_data_1A = preprocess_default(xsd_schema, data=xml_string_1, default_data=xml_string_default_a)\n",
"print_xml(xml_data_1A)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For the first iteration on the second string, we can find `validity` can be substituted"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Default element found!\n",
" chunk.reason: The child n.1 of element 'cert' has a unexpected tag 'serial_number'. Tag 'subject_name' expected.\n",
" index: 0\n",
" expected: subject_name\n",
"Default element found!\n",
" chunk.reason: The child n.2 of element 'cert' has a unexpected tag 'serial_number'. Tag 'validity' expected.\n",
" index: 1\n",
" expected: validity\n",
"Default element found!\n",
" chunk.reason: The content of element 'validity' is not complete. Tag 'not_after' expected.\n",
" index: 1\n",
" expected: not_after\n",
"Default element found!\n",
" chunk.reason: The content of element 'cert' is not complete. Tag 'issuer_name' expected.\n",
" index: 3\n",
" expected: issuer_name\n",
"\n",
"<?xml version=\"1.0\" encoding=\"utf-8\"?>\n",
"<profile>\n",
" <authorities>\n",
" <authority name=\"identity_ca\">\n",
" <cert>\n",
" <subject_name>CN={xml_string_default}</subject_name>\n",
" <validity>\n",
" <not_before>2013-06-01T13:00:00</not_before>\n",
" <not_after>2023-06-01T13:00:00</not_after>\n",
" </validity>\n",
" <serial_number>0</serial_number>\n",
" <issuer_name>root_ca</issuer_name>\n",
" </cert>\n",
" </authority>\n",
" </authorities>\n",
"</profile>\n",
"\n"
]
}
],
"source": [
"xml_data_2A = preprocess_default(xsd_schema, data=xml_string_2, default_data=xml_string_default_a)\n",
"print_xml(xml_data_2A)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Using the third string and second default string however, we find that even when no default is found, i.e. for `cert` or `validity`, it is created, then populated as the xml schema decoder returns further XMLSchemaChildrenValidationError for the missing child elements."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"No default found!\n",
" chunk.reason: The content of element 'authority' is not complete. Tag 'cert' expected.\n",
" index: 0\n",
" expected: cert\n",
"Default element found!\n",
" chunk.reason: The content of element 'cert' is not complete. One of ['subject_name', 'validity', 'serial_number', 'issuer_name'] is expected.\n",
" index: 0\n",
" expected: subject_name\n",
"No default found!\n",
" chunk.reason: The content of element 'cert' is not complete. Tag 'validity' expected.\n",
" index: 1\n",
" expected: validity\n",
"Default element found!\n",
" chunk.reason: The content of element 'validity' is not complete. One of ['not_before', 'not_after'] is expected.\n",
" index: 0\n",
" expected: not_before\n",
"Default element found!\n",
" chunk.reason: The content of element 'validity' is not complete. Tag 'not_after' expected.\n",
" index: 1\n",
" expected: not_after\n",
"Default element found!\n",
" chunk.reason: The content of element 'cert' is not complete. Tag 'serial_number' expected.\n",
" index: 2\n",
" expected: serial_number\n",
"Default element found!\n",
" chunk.reason: The content of element 'cert' is not complete. Tag 'issuer_name' expected.\n",
" index: 3\n",
" expected: issuer_name\n",
"\n",
"<?xml version=\"1.0\" encoding=\"utf-8\"?>\n",
"<profile>\n",
" <authorities>\n",
" <authority name=\"identity_ca\">\n",
" <cert>\n",
" <subject_name>CN={xml_string_default}</subject_name>\n",
" <validity>\n",
" <not_before>2013-06-01T13:00:00</not_before>\n",
" <not_after>2023-06-01T13:00:00</not_after>\n",
" </validity>\n",
" <serial_number>2</serial_number>\n",
" <issuer_name>root_ca</issuer_name>\n",
" </cert>\n",
" </authority>\n",
" </authorities>\n",
"</profile>\n",
"\n"
]
}
],
"source": [
"xml_data_2B = preprocess_default(xsd_schema, data=xml_string_3, default_data=xml_string_default_b)\n",
"print_xml(xml_data_2B)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We see the order of chunks found to be missing"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"# chunks"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Perhaps from the attributes of the chunk, we could determine the index the element was expected to be found, so that it can be correctly inserted in the sequence."
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'index': 0,\n",
" 'expected': 'subject_name',\n",
" 'validator': XsdElement(name='subject_name'),\n",
" 'obj': <Element 'cert' at 0x7f0c68f5b0e8>,\n",
" 'reason': \"The child n.1 of element 'cert' has a unexpected tag 'serial_number'. Tag 'subject_name' expected.\",\n",
" 'schema_elem': <Element '{http://www.w3.org/2001/XMLSchema}element' at 0x7f0c690827c8>,\n",
" 'elem': <Element 'cert' at 0x7f0c68f5b0e8>,\n",
" 'message': None}"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"foo = chunks[0]\n",
"vars(foo)"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'/usr/lib/python3.6/xml/etree/ElementTree.py'"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import inspect\n",
"inspect.getfile(foo.elem.__class__)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment