Created
July 10, 2023 23:08
-
-
Save paul-english/363a54bac04e23d4e44858c3e9cec55d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sqlite3 | |
from xml.etree.ElementTree import iterparse | |
from tqdm import tqdm | |
import json | |
def create_database(dbname: str): | |
# Create a connection to the SQLite database | |
conn = sqlite3.connect(dbname) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE dbinfo ( | |
dbinfo_id INTEGER PRIMARY KEY, | |
dbname TEXT NOT NULL, | |
version TEXT, | |
entry_count TEXT, | |
file_date TEXT | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE ipr ( | |
ipr_id INTEGER PRIMARY KEY, | |
match_id INTEGER NOT NULL, | |
id TEXT NOT NULL, | |
name TEXT NOT NULL, | |
type TEXT NOT NULL, | |
parent_id TEXT, | |
FOREIGN KEY (match_id) REFERENCES match (match_id) | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE lcn ( | |
lcn_id INTEGER PRIMARY KEY, | |
match_id INTEGER NOT NULL, | |
start TEXT NOT NULL, | |
end TEXT NOT NULL, | |
fragments TEXT, | |
score TEXT NOT NULL, | |
FOREIGN KEY (match_id) REFERENCES match (match_id) | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE match ( | |
match_id INTEGER PRIMARY KEY, | |
protein_id INTEGER NOT NULL, | |
id TEXT NOT NULL, | |
name TEXT NOT NULL, | |
dbname TEXT NOT NULL, | |
status TEXT NOT NULL, | |
evd TEXT NOT NULL, | |
model TEXT NOT NULL, | |
FOREIGN KEY (protein_id) REFERENCES protein (protein_id) | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE protein ( | |
protein_id INTEGER PRIMARY KEY, | |
id TEXT NOT NULL, | |
name TEXT NOT NULL, | |
length TEXT NOT NULL, | |
crc64 TEXT NOT NULL | |
) | |
''') | |
# Commit the changes and close the connection | |
conn.commit() | |
conn.close() | |
def element_to_dict(element): | |
result = {} | |
if element.attrib: | |
result.update(element.attrib) | |
for child in element: | |
child_dict = element_to_dict(child) | |
if child.tag in result: | |
if not isinstance(result[child.tag], list): | |
result[child.tag] = [result[child.tag]] | |
result[child.tag].append(child_dict) | |
else: | |
result[child.tag] = child_dict | |
return result | |
def process_element(elem, conn, cursor): | |
#dbinfo, protein, match, ipr, lcn = 0,0,0,0,0 | |
if elem.tag == 'dbinfo': | |
dbinfo_attrib = elem.attrib | |
cursor.execute('INSERT INTO dbinfo (dbname, version, entry_count, file_date) VALUES (?, ?, ?, ?)', | |
(dbinfo_attrib['dbname'], dbinfo_attrib.get('version'), dbinfo_attrib.get('entry_count'), | |
dbinfo_attrib.get('file_date'))) | |
#dbinfo += 1 | |
elif elem.tag == 'protein': | |
protein_attrib = elem.attrib | |
cursor.execute('INSERT INTO protein (id, name, length, crc64) VALUES (?, ?, ?, ?)', | |
(protein_attrib['id'], protein_attrib['name'], protein_attrib['length'], protein_attrib['crc64'])) | |
protein_id = cursor.lastrowid | |
#protein += 1 | |
for child in elem: | |
if child.tag == 'match': | |
match_attrib = child.attrib | |
try: | |
cursor.execute('INSERT INTO match (protein_id, id, name, dbname, status, evd, model) VALUES (?, ?, ?, ?, ?, ?, ?)', | |
(protein_id, match_attrib['id'], match_attrib['name'], match_attrib['dbname'], | |
match_attrib['status'], match_attrib['evd'], match_attrib['model'])) | |
except KeyError as e: | |
print('--', match_attrib, element_to_dict(elem)) | |
raise e | |
match_id = cursor.lastrowid | |
#match += 1 | |
for sub_child in child: | |
if sub_child.tag == 'ipr': | |
ipr_attrib = sub_child.attrib | |
cursor.execute('INSERT INTO ipr (match_id, id, name, type, parent_id) VALUES (?, ?, ?, ?, ?)', | |
(match_id, ipr_attrib['id'], ipr_attrib['name'], ipr_attrib['type'], | |
ipr_attrib.get('parent_id'))) | |
#ipr += 1 | |
elif sub_child.tag == 'lcn': | |
lcn_attrib = sub_child.attrib | |
cursor.execute('INSERT INTO lcn (match_id, start, end, fragments, score) VALUES (?, ?, ?, ?, ?)', | |
(match_id, lcn_attrib['start'], lcn_attrib['end'], lcn_attrib.get('fragments'), | |
lcn_attrib['score'])) | |
#lcn += 1 | |
#return dbinfo, protein, match, ipr, lcn | |
def main(): | |
filename = './data/interpro/match_complete.xml' | |
dbname = './data/interpro/match_complete.sqlite' | |
os.system(f'rm -rf {dbname}') | |
create_database(dbname) | |
conn = sqlite3.connect(dbname) | |
cursor = conn.cursor() | |
totals = { | |
'dbinfo': 0, | |
'protein': 0, | |
'match': 0, | |
'ipr': 0, | |
'lcn': 0, | |
} | |
try: | |
with tqdm() as pbar: | |
for event, elem in iterparse(filename, events=("end",)): | |
if event == 'end': | |
if elem.tag == 'dbinfo' or elem.tag == 'protein': | |
#dbinfo, protein, match, ipr, lcn = process_element(elem, conn, cursor) | |
process_element(elem, conn, cursor) | |
conn.commit() | |
#totals['dbinfo'] += dbinfo | |
#totals['protein'] += protein | |
#totals['match'] += match | |
#totals['ipr'] += ipr | |
#totals['lcn'] += lcn | |
#pbar.set_description(json.dumps(totals)) | |
pbar.update() | |
elem.clear() | |
finally: | |
conn.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment