|
#!/usr/bin/env python3 |
|
import csv |
|
import datetime |
|
import json |
|
import logging |
|
import os.path |
|
import sqlite3 |
|
import sys |
|
from enum import Enum |
|
from sqlite3 import Connection, Cursor |
|
from typing import Optional |
|
|
|
import numpy as np |
|
import pandas as pd |
|
from olclient import OpenLibrary |
|
from pandarallel import pandarallel |
|
from requests.adapters import HTTPAdapter, Response |
|
from requests.packages.urllib3.util.retry import Retry |
|
|
|
ol = OpenLibrary() |
|
OL_IMPORT_API_URL = "https://www.openlibrary.org/api/import" |
|
|
|
adapter = HTTPAdapter(max_retries=Retry(total=5, read=5, connect=5, backoff_factor=0.3)) |
|
ol.session.mount("https://", adapter) |
|
|
|
pandarallel.initialize() |
|
|
|
# fake = Faker() |
|
# Faker.seed(57) |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger("bwb-import-bot") |
|
|
|
CLI_HELP_TEXT = """ |
|
usage: |
|
python3.9 bwb-import-bot.py setup_db 2021-05.csv bwb-import-state.db batch_id_here |
|
python3.9 bwb-import-bot.py process bwb-import-state.db 1 |
|
The last arg is the limit, if not provided will default to 10000 |
|
""" |
|
|
|
COL_NUMBER_TO_COL_NAME_MAP = { |
|
10: "title", |
|
135: "publisher", |
|
20: "publication_date", |
|
19: "copyright", |
|
124: "isbn", |
|
36: "pages", |
|
37: "language", |
|
54: "issn", |
|
145: "doi", |
|
146: "lccn", |
|
147: "lc_class", |
|
49: "dewey", |
|
39: "weight", |
|
40: "length", |
|
41: "width", |
|
42: "height", |
|
} |
|
|
|
|
|
class ImportStatusEnum(Enum): |
|
SUCCESS = "SUCCESS" |
|
ERROR = "ERROR" |
|
TO_BE_IMPORTED = "TO_BE_IMPORTED" |
|
|
|
|
|
def pre_setup_db(sqlite_conn: Connection) -> None: |
|
# Setting up the table and a unique index on month and line number to prevent repeated imports of the same data |
|
sqlite_conn.execute( |
|
""" |
|
CREATE TABLE if not exists "import_state" ( |
|
"line_number" INTEGER, |
|
"title" TEXT, |
|
"publisher" TEXT, |
|
"publication_date" INTEGER, |
|
"copyright" REAL, |
|
"isbn" INTEGER, |
|
"pages" INTEGER, |
|
"language" TEXT, |
|
"issn" TEXT, |
|
"doi" TEXT, |
|
"lccn" TEXT, |
|
"lc_class" TEXT, |
|
"dewey" TEXT, |
|
"weight" REAL, |
|
"length" REAL, |
|
"width" REAL, |
|
"height" REAL, |
|
"subjects" TEXT, |
|
"contributors_dicts" TEXT, |
|
"status" TEXT, |
|
"batch_id" TEXT, |
|
"comment" TEXT |
|
) |
|
""" |
|
) |
|
sqlite_conn.execute( |
|
""" |
|
create unique index if not exists batch_id_line_no_uniq |
|
on import_state(batch_id, line_number) |
|
""" |
|
) |
|
|
|
|
|
def get_subjects(row: pd.Series) -> list[str]: |
|
subjects_list = row.iloc[91:100] |
|
return [ |
|
s.capitalize().replace("_", ", ") |
|
for s in subjects_list |
|
if s and pd.notnull(s) and isinstance(s, str) |
|
] |
|
|
|
|
|
def make_author(contributor: list[str]) -> dict: |
|
author = {"name": contributor[0]} |
|
if contributor[2] == "X": |
|
# set corporate contributor |
|
author["entity_type"] = "org" |
|
# TODO: sort out contributor types |
|
# AU = author |
|
# ED = editor |
|
return author |
|
|
|
|
|
def get_contributors(row: pd.Series) -> list[dict]: |
|
row_as_list = list(row) |
|
contributors = [] |
|
for i in range(5): |
|
contributors.append( |
|
[row_as_list[21 + i * 3], row_as_list[22 + i * 3], row_as_list[23 + i * 3]] |
|
) |
|
|
|
# form list of author dicts |
|
return [make_author(c) for c in contributors if c[0] and pd.notnull(c[0])] |
|
|
|
|
|
def setup_db_from_csv_dump( |
|
sqlite_connection: Connection, csv_file_path: str, batch_id_str: str |
|
) -> None: |
|
# can also import multiple csv files here using glob patterns |
|
df: pd.DataFrame = pd.read_csv( |
|
csv_file_path, sep="|", header=None, quoting=csv.QUOTE_NONE |
|
) |
|
df["subjects"] = df.parallel_apply( |
|
lambda row: json.dumps(get_subjects(row)), axis=1 |
|
) |
|
df["contributors_dicts"] = df.parallel_apply( |
|
lambda row: json.dumps(get_contributors(row)), axis=1 |
|
) |
|
|
|
required_col_list = list(COL_NUMBER_TO_COL_NAME_MAP.keys()) |
|
required_col_list.extend(["subjects", "contributors_dicts"]) |
|
df = df[required_col_list].rename(columns=COL_NUMBER_TO_COL_NAME_MAP) |
|
|
|
# Dropping duplicate ISBNs |
|
df = df.drop_duplicates(subset=["isbn"], keep="last") |
|
|
|
df["status"] = ImportStatusEnum.TO_BE_IMPORTED.name |
|
df["batch_id"] = batch_id_str |
|
df["comment"] = np.NaN |
|
|
|
logger.info(f"Inserting {len(df)} rows into database") |
|
|
|
# Inserting into the sqlite db |
|
df.to_sql( |
|
con=sqlite_connection, |
|
name="import_state", |
|
chunksize=10000, |
|
method="multi", |
|
if_exists="append", |
|
index_label="line_number", |
|
) |
|
|
|
|
|
def update_status_and_comment( |
|
sqlite_conn: Connection, |
|
cursor: Cursor, |
|
batch_id_str: str, |
|
line_number: int, |
|
status: str, |
|
comment: Optional[str], |
|
): |
|
# TODO: Check success here |
|
# TODO: Use parameters in query rather than string interpolation |
|
sanitized_comment = comment if comment else "NULL" |
|
cursor.execute( |
|
f"UPDATE import_state set comment='{sanitized_comment}', status='{status}' where import_state.line_number={line_number} and import_state.batch_id='{batch_id_str}' " |
|
) |
|
sqlite_conn.commit() |
|
|
|
|
|
def ol_import(payload: dict): |
|
# TODO: refactor this, very verbose code! |
|
import_payload = {} |
|
title = payload.get("title") |
|
isbn_13 = payload.get("isbn") |
|
publish_data = payload.get("publication_date") |
|
publisher = payload.get("publisher") |
|
authors = payload.get("contributors_dicts") |
|
lc_classifications = payload.get("lc_class") |
|
no_pages = payload.get("pages") |
|
languages = payload.get("language") |
|
subjects = payload.get("subjects") |
|
if title: |
|
import_payload["title"] = title |
|
if isbn_13: |
|
import_payload["isbn_13"] = isbn_13 |
|
if publish_data: |
|
import_payload["publish_data"] = str(publish_data)[:4] |
|
if publisher: |
|
import_payload["publishers"] = [publisher] |
|
if authors: |
|
import_payload["authors"] = authors |
|
if lc_classifications: |
|
import_payload["lc_classifications"] = lc_classifications |
|
if no_pages is not None: |
|
import_payload["number_of_pages"] = no_pages |
|
if languages: |
|
import_payload["languages"] = [languages] |
|
if subjects: |
|
import_payload["subjects"] = subjects |
|
logger.info("hitting import with") |
|
logger.info(import_payload) |
|
# Excluding source records here. TODO |
|
r: Response = ol.session.post(OL_IMPORT_API_URL, data=json.dumps(import_payload)) |
|
if r.status_code != 200: |
|
logger.error( |
|
{"status_code": r.status_code, "content": r.content, "level": "error"} |
|
) |
|
raise Exception(r.content) |
|
|
|
|
|
def process_row_and_import_to_ol( |
|
sqlite_conn: Connection, cursor: Cursor, row_data: pd.Series |
|
): |
|
bwb_data = dict(row_data) |
|
batch_id_from_data = bwb_data["batch_id"] |
|
line_number = bwb_data["line_number"] |
|
assert bwb_data["status"] == ImportStatusEnum.TO_BE_IMPORTED.name |
|
try: |
|
bwb_data["subjects"] = json.loads(bwb_data["subjects"]) |
|
bwb_data["contributors_dicts"] = json.loads(bwb_data["contributors_dicts"]) |
|
ol_import(bwb_data) |
|
update_status_and_comment( |
|
sqlite_conn=sqlite_conn, |
|
cursor=cursor, |
|
batch_id_str=batch_id_from_data, |
|
line_number=line_number, |
|
status=ImportStatusEnum.SUCCESS.name, |
|
comment=None, |
|
) |
|
except Exception as e: |
|
update_status_and_comment( |
|
sqlite_conn=sqlite_conn, |
|
cursor=cursor, |
|
batch_id_str=batch_id_from_data, |
|
line_number=line_number, |
|
status=ImportStatusEnum.ERROR.name, |
|
comment=str(e), |
|
) |
|
|
|
|
|
def process_imports_in_batches(sqlite_conn: Connection, batch_size: int) -> None: |
|
while True: |
|
df = pd.read_sql( |
|
f"SELECT * FROM import_state WHERE import_state.status=='TO_BE_IMPORTED' LIMIT {batch_size}", |
|
db_connection, |
|
) |
|
logger.info(f"Processing {len(df)} records") |
|
if len(df) == 0: |
|
# All TO_BE_IMPORTED are processed! |
|
return |
|
cursor = sqlite_conn.cursor() |
|
df.apply( |
|
lambda row_data: process_row_and_import_to_ol( |
|
sqlite_conn=sqlite_conn, cursor=cursor, row_data=row_data |
|
), |
|
axis=1, |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
start_time: datetime.datetime = datetime.datetime.now() |
|
cli_args: list[str] = sys.argv |
|
logger.info(f"started at {start_time}. with args {cli_args[1:]}") |
|
|
|
LIMIT = 1 |
|
|
|
if len(cli_args) > 0 and cli_args[1] == "setup_db": |
|
csv_path = cli_args[2] |
|
db_path = cli_args[3] |
|
batch_id_from_args = cli_args[4] |
|
batch_id_file_name = os.path.split(csv_path) |
|
batch_id, ext = batch_id_file_name[-1].split(".") |
|
db_connection: Connection = sqlite3.connect(db_path) |
|
pre_setup_db(db_connection) |
|
setup_db_from_csv_dump( |
|
sqlite_connection=db_connection, |
|
csv_file_path=csv_path, |
|
batch_id_str=batch_id_from_args, |
|
) |
|
elif len(cli_args) > 0 and cli_args[1] == "process": |
|
db_path = cli_args[2] |
|
limit = cli_args[3] if len(cli_args) > 2 else 10000 |
|
db_connection: Connection = sqlite3.connect(db_path) |
|
process_imports_in_batches(db_connection, limit) |
|
else: |
|
print(CLI_HELP_TEXT) |
|
|
|
end_time = datetime.datetime.now() |
|
logger.info(f"Ended on {end_time}. Took {(end_time - start_time).seconds} seconds") |
We may also have to import
csv