Created
April 5, 2018 12:02
-
-
Save ross-spencer/be123a4448da1d94124d4477a1affbc5 to your computer and use it in GitHub Desktop.
METS Reader/Writer Demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import argparse | |
from datetime import datetime | |
import lxml | |
import logging | |
import metsrw | |
from metsrw.plugins import premisrw | |
import os | |
import sys | |
import uuid | |
LOGFORMAT = '%(asctime)-15s %(levelname)s: %(message)s' | |
DATEFORMAT = '%m/%d/%Y %H:%M:%S' | |
parser = argparse.ArgumentParser(description='metsrw client for am camp.') | |
parser.add_argument('mets', metavar='M', type=str, nargs=1, | |
help='a mets file to parse') | |
parser.add_argument('--logging', type=str, nargs="?", | |
help='logging level, INFO, DEBUG, WARNING, ERROR') | |
args = parser.parse_args() | |
if args.logging is None or args.logging not in ["INFO", | |
"DEBUG", | |
"WARNING", | |
"ERROR"]: | |
logging.basicConfig(format=LOGFORMAT, datefmt=DATEFORMAT) | |
else: | |
logging.basicConfig(format=LOGFORMAT, datefmt=DATEFORMAT, level=args.logging) | |
DEMO_FILE = "AM_CAMP_METS.xml" | |
filename = args.mets[0] | |
try: | |
mets = metsrw.METSDocument.fromfile(filename) # Reads a file | |
except lxml.etree.XMLSyntaxError as e: | |
logging.error("METS %s", e) | |
sys.exit(1) | |
except IOError as e: | |
logging.error("File does not exist %s", e) | |
sys.exit(1) | |
def prettyprint(xml_doc): | |
return lxml.etree.tostring(xml_doc, pretty_print=True) | |
PREMIS_OBJECT = 'PREMIS:OBJECT' | |
PREMIS_EVENT = 'PREMIS:EVENT' | |
PREMIS_AGENT = 'PREMIS:AGENT' | |
def getpremis(admin_md_secs, objects=False, events=False, agents=False): | |
NS = "{info:lc/xmlns/premis-v2}" | |
FNAME = "{0}originalName".format(NS) | |
ID_TYPE = "{0}objectIdentifier/{0}objectIdentifierType".format(NS) | |
ID = "{0}objectIdentifier/{0}objectIdentifierValue".format(NS) | |
FIXITY_TYPE = "{0}objectCharacteristics/{0}fixity/{0}messageDigestAlgorithm".format(NS) | |
FIXITY = "{0}objectCharacteristics/{0}fixity/{0}messageDigest".format(NS) | |
EVENT_DATE = "{0}eventDateTime".format(NS) | |
EVENT_TYPE = "{0}eventType".format(NS) | |
EVENT_OUTCOME = "{0}eventOutcomeInformation/{0}eventOutcome".format(NS) | |
EVENT_DETAIL = "{0}eventDetail".format(NS) | |
for amd_sec in admin_md_secs: | |
if amd_sec.subsections is not None: | |
for sub_sec in amd_sec.subsections: | |
if objects and sub_sec.contents.mdtype == PREMIS_OBJECT: | |
print "NAME {0}".format(sub_sec.contents.document.find(FNAME).text) | |
print "{0} {1}".format(sub_sec.contents.document.find(ID_TYPE).text, | |
sub_sec.contents.document.find(ID).text) | |
algorithm = sub_sec.contents.document.find(FIXITY_TYPE) | |
if algorithm is not None: | |
print "FIXITY {0} {1}".format(algorithm.text, | |
sub_sec.contents.document.find(FIXITY).text) | |
elif events and sub_sec.contents.mdtype == PREMIS_EVENT: | |
if sub_sec.contents.document.find(EVENT_TYPE).text.upper() == "CREATION" or \ | |
sub_sec.contents.document.find(EVENT_TYPE).text.upper() == "INGESTION" or \ | |
sub_sec.contents.document.find(EVENT_TYPE).text.upper() == "REGISTRATION": | |
logging.info("Ignoring CREATION, INGESTION, REGISTRATION") | |
else: | |
print "{0} {1} {2} {3}".format(sub_sec.contents.document.find(EVENT_TYPE).text.upper(), | |
sub_sec.contents.document.find(EVENT_DATE).text, | |
sub_sec.contents.document.find(EVENT_OUTCOME).text, | |
sub_sec.contents.document.find(EVENT_DETAIL).text).replace("\n","") | |
elif agents and sub_sec.contents.mdtype == PREMIS_AGENT: | |
logging.info("%s not being used", sub_sec.contents.mdtype) | |
for file_system_entry in mets.all_files(): | |
if file_system_entry.path is not None: | |
print "---" | |
print "FILE INFORMATION {0} {1}".format(file_system_entry.path, file_system_entry.file_uuid) | |
if file_system_entry.amdsecs is not None: | |
getpremis(file_system_entry.amdsecs, objects=True) | |
getpremis(file_system_entry.amdsecs, events=True) | |
print("---") | |
if filename != DEMO_FILE: | |
print("Validation results:") | |
try: | |
is_valid, report = metsrw.xsd_validate(mets.serialize()) | |
if is_valid is not True: | |
print(is_valid, report) | |
else: | |
print("Schema validation via XSD is valid.") | |
except lxml.etree.XMLSchemaParseError as e: | |
if ("The QName value '{http://www.w3.org/1999/xlink}simpleLink' " | |
"does not resolve to a(n) attribute group definition.") in str(e): | |
logging.info("We're likely offline, so ignoring validation") | |
else: | |
logging.error(e) | |
errors = [] | |
is_valid, report = metsrw.schematron_validate(mets.serialize()) | |
if report is not None: | |
for element in report.findall("{http://purl.oclc.org/dsdl/svrl}failed-assert"): | |
errors.append(element) | |
else: | |
print("Schema validation via TRON is valid.") | |
print("") | |
err_set = [] | |
if errors is not None: | |
for failure in errors: | |
# for additional debug | |
test_assert = failure.get('test') | |
test_location = failure.get('location') | |
# get our errors | |
result_message = failure.find("{http://purl.oclc.org/dsdl/svrl}text") | |
err_set.append(result_message.text.strip()) | |
if len(err_set) > 0: | |
print("{errors} errors found by scematron across the following " | |
"categories\n".format(errors=len(errors))) | |
for m in set(err_set): | |
print("Error: {0}".format(" ".join(m.replace("\n", "").replace("'","").split()))) | |
print("---") | |
def generate_event(): | |
# Add some new EVENTS to our METS | |
return ('event', ('event_identifier', | |
('event_identifier_type', "UUID"), | |
('event_identifier_value', uuid.uuid4())), | |
('event_type', "AM CAMP DEMO"), | |
('event_date_time', datetime.now().isoformat()), | |
('event_detail', "Adding new PREMIS EVENTS"), | |
('event_outcome_information', ('event_outcome', "SUCCESS"), | |
('event_outcome_detail', | |
('event_outcome_detail_note', | |
"dag iedereen!"))), | |
('linking_agent_identifier', | |
('linking_agent_identifier_type', "python script"), | |
('linking_agent_identifier_value', "1.0"))) | |
for file_system_entry in mets.all_files(): | |
if file_system_entry.path is not None: | |
added = file_system_entry.add_premis_event(premisrw.data_to_premis(generate_event())) | |
if not os.path.exists(DEMO_FILE): | |
with open(DEMO_FILE, "w+b") as f: | |
f.write(mets.tostring()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment