|  | #!/usr/bin/env python3 | 
        
          |  |  | 
        
          |  | import httpx | 
        
          |  | import os | 
        
          |  | import re | 
        
          |  | import sys # needed to be able to "nicely" exit script :) | 
        
          |  |  | 
        
          |  | # Load config from .env file instead of hardcoding config values | 
        
          |  | from dotenv import load_dotenv | 
        
          |  |  | 
        
          |  | ####################################################### | 
        
          |  | # Config | 
        
          |  | # | 
        
          |  | # Ensure your .env file follows the provided example | 
        
          |  | # Or, if you rather hardcode values, remove the following lines and uncomment the lower section. | 
        
          |  | ####################################################### | 
        
          |  |  | 
        
          |  | load_dotenv() | 
        
          |  |  | 
        
          |  | # Credentials | 
        
          |  | API_AUTH_TOKEN = os.getenv("API_AUTH_TOKEN") | 
        
          |  |  | 
        
          |  | # Connection info | 
        
          |  | PAPERLESS_URL = os.getenv("PAPERLESS_URL", "http://localhost:8000") | 
        
          |  | SESSION_TIMEOUT = float(os.getenv("SESSION_TIMEOUT", 5.0)) | 
        
          |  |  | 
        
          |  | # Tagging | 
        
          |  | ADD_TAG = bool(os.getenv("ADD_TAG", True)) | 
        
          |  | MODIFIED_TAG = os.getenv("MODIFIED_TAG") | 
        
          |  |  | 
        
          |  | # If not using an .env file, remove the previous lines and uncomment the below | 
        
          |  | # API_AUTH_TOKEN="authtoken" | 
        
          |  | # PAPERLESS_URL = "http://localhost:8000" # use the internal url! | 
        
          |  | # SESSION_TIMEOUT = 5.0 | 
        
          |  | # ADD_TAG = True | 
        
          |  | # MODIFIED_TAG = "PostModified" | 
        
          |  |  | 
        
          |  | ####################################################### | 
        
          |  | # Filename parsing | 
        
          |  | ####################################################### | 
        
          |  |  | 
        
          |  | def parseFileName(filename: str): | 
        
          |  | """ | 
        
          |  | Parses the given string with the Regexp defined inside the function to extract | 
        
          |  | a date string, correspondent string and title string | 
        
          |  |  | 
        
          |  | In case one of the three return values could not be extracted from the input string None will be returned instead. | 
        
          |  |  | 
        
          |  | In my case I have all files already saved in the following format: | 
        
          |  | 2023-01-23 - My Phone Company - Invoice for December 2022 | 
        
          |  |  | 
        
          |  | I want to extract my pattern, like this: | 
        
          |  | - 2023-01-23 -> date | 
        
          |  | - My Phone Company -> correspondent | 
        
          |  | - Invoice for December 2022 -> title | 
        
          |  |  | 
        
          |  | I did extend the regexp to handle the following filename formats as well | 
        
          |  | 2023-01-xx - My Phone Company - Invoice for December 2022 | 
        
          |  | 20230123 - My Phone Company - Invoice for December 2022 | 
        
          |  | 20230123-My Phone Company - Invoice for December 2022 | 
        
          |  | 202301xx - My Phone Company - Invoice for December 2022 | 
        
          |  | 202301xx-My Phone Company - Invoice for December 2022 | 
        
          |  | """ | 
        
          |  |  | 
        
          |  | # initialize the parts we want to find with regexp - if they are not found they are not known thus resulting in | 
        
          |  | # error messages later on | 
        
          |  | date_extracted = None | 
        
          |  | correspondent_extracted = None | 
        
          |  | title_extracted = None | 
        
          |  |  | 
        
          |  | # V3 of my matching regexp | 
        
          |  | pattern = re.compile(r'(\d{4}-?\d{2}-?(\d{2}|xx))(\s?-\s?)(.*?)(\s-\s)(.*)') | 
        
          |  | # 1: (\d{4}-?\d{2}-?(\d{2}|xx)) is the date in ISO-Date order either with or without hyphens (e.g. 2023-01-22 or 20230122. | 
        
          |  | #                               there is also the possibility to write the day of month as xxx e.g. 2023-01-xx if in the | 
        
          |  | #                               document the date was given as "January 2023" | 
        
          |  | # 2: (\d{2}|xx) is either the day of month or xx if the document contains just a date as "January 2023" | 
        
          |  | # 3: (\s?-\s?) is my divider for fields, normally it is " - " but sometimes my wife saves stuff without the spaces, so "-" | 
        
          |  | #              is a positive match as well | 
        
          |  | # 4: (.*?) is my Correspondent, e.g. "My Phone Company" It is used non-greedy as otherwise we get in trouble with the "-" | 
        
          |  | # 5: (\s-\s) is my divider between correspondent and title. Here the spaces must be there, otherwise a title like A-Team | 
        
          |  | #            would lead to big trouble :) | 
        
          |  | # 6: (.*) is the rest of the filename, by definition my title, e.g. "Bill for December 2022" | 
        
          |  | # So we need to take care about matching group 1, 4 and 6 | 
        
          |  |  | 
        
          |  | findings = pattern.match(filename) | 
        
          |  | if findings: | 
        
          |  | date_extracted          = findings.group(1) | 
        
          |  | correspondent_extracted = findings.group(4) | 
        
          |  | title_extracted         = findings.group(6) | 
        
          |  |  | 
        
          |  | # post processing of extracted date only if the regexp was successful | 
        
          |  | if date_extracted != None: | 
        
          |  | # if the date field contains "xx" replace it with "01" as the first day of the month | 
        
          |  | date_extracted = date_extracted.replace("xx", "01") | 
        
          |  |  | 
        
          |  | # if the date did not contain hyphens, add them back to be in ISO format | 
        
          |  | if date_extracted.find("-") < 0: | 
        
          |  | year = date_extracted[0:4] | 
        
          |  | month = date_extracted[4:6] | 
        
          |  | day = date_extracted[6:8] | 
        
          |  |  | 
        
          |  | date_extracted = f"{year}-{month}-{day}" | 
        
          |  |  | 
        
          |  | print("Result of RegExp:") | 
        
          |  |  | 
        
          |  | if date_extracted == None: | 
        
          |  | print("No Date found! Exiting") | 
        
          |  | sys.exit() | 
        
          |  |  | 
        
          |  | if correspondent_extracted == None: | 
        
          |  | print("No Correspondent found! Exiting") | 
        
          |  | sys.exit() | 
        
          |  |  | 
        
          |  | if title_extracted == None: | 
        
          |  | print("No Title found! Exiting") | 
        
          |  | sys.exit() | 
        
          |  |  | 
        
          |  | print(f"Date extracted         : '{date_extracted}'") | 
        
          |  | print(f"Correspondent extracted: '{correspondent_extracted}'") | 
        
          |  | print(f"Doc Title extracted    : '{title_extracted}'") | 
        
          |  |  | 
        
          |  | return date_extracted, correspondent_extracted, title_extracted | 
        
          |  |  | 
        
          |  | ####################################################### | 
        
          |  | # Database querying | 
        
          |  | ####################################################### | 
        
          |  |  | 
        
          |  | def getItemIDByName(item_name: str, route: str, session: httpx.Client, timeout: float): | 
        
          |  | """ | 
        
          |  | Gets an item's ID by looking up its name and API route. | 
        
          |  |  | 
        
          |  | If no item exists, returns None. | 
        
          |  |  | 
        
          |  | This function handles a (potentially impossible?) edge case of multiple items existing under that name; input welcome! | 
        
          |  | """ | 
        
          |  |  | 
        
          |  | # Query DB for data matching name in route | 
        
          |  | response_data = _get_resp_data(f"{route}?name__iexact={item_name}", session, timeout) | 
        
          |  | response_count = response_data["count"] | 
        
          |  |  | 
        
          |  | # If no item exists, return None | 
        
          |  | if response_count == 0: | 
        
          |  | print(f"No existing id found for item '{item_name}'.") | 
        
          |  | return None | 
        
          |  |  | 
        
          |  | # If one item exists, return that | 
        
          |  | elif response_count == 1: | 
        
          |  | new_item_id = response_data["results"][0]['id'] | 
        
          |  | print(f"Found existing id '{str(new_item_id)}' for item: '{item_name}'") | 
        
          |  | return new_item_id | 
        
          |  |  | 
        
          |  | # If multiple items exist, return the first and print a warning | 
        
          |  | elif response_count > 1: | 
        
          |  | print(f"Warning: Unexpected situation – multiple results found for '{item_name}'. Feedback welcome.") | 
        
          |  | new_item_id = response_data["results"][0]['id'] | 
        
          |  | return new_item_id | 
        
          |  |  | 
        
          |  | # This would be strange. | 
        
          |  | else: | 
        
          |  | print("Warning: Unexpected condition in getItemIDByName!") | 
        
          |  | return new_item_id | 
        
          |  |  | 
        
          |  | return new_item_id | 
        
          |  |  | 
        
          |  | def createItemByName(item_name: str, route: str, session: httpx.Client, timeout: float, skip_existing_check: bool = False): | 
        
          |  | """ | 
        
          |  | Creates a new item in the database given its name and API route. | 
        
          |  |  | 
        
          |  | An optional parameter is presented to skip checking for whether the item already exists. | 
        
          |  | """ | 
        
          |  |  | 
        
          |  | new_item_id = None | 
        
          |  |  | 
        
          |  | # Conditionally check whether the item exists | 
        
          |  | if skip_existing_check == False: | 
        
          |  | new_item_id = getItemIDByName(item_name, route, session, timeout) | 
        
          |  |  | 
        
          |  | if new_item_id != None: | 
        
          |  | return new_item_id | 
        
          |  |  | 
        
          |  | # Create item at given route | 
        
          |  | data = { | 
        
          |  | "name": item_name, | 
        
          |  | "matching_algorithm": 6, | 
        
          |  | "is_insensitive": True | 
        
          |  | } | 
        
          |  | response = session.post(route, data=data, timeout=timeout) | 
        
          |  | response.raise_for_status(); | 
        
          |  |  | 
        
          |  | new_item_id = response.json()["id"] | 
        
          |  | print(f"Item '{item_name}' created with id: '{str(new_item_id)}'") | 
        
          |  |  | 
        
          |  | # If no new_item_id has been returned, something went wrong - do not process further | 
        
          |  | if new_item_id == None: | 
        
          |  | print(f"Error: Couldn't create item with name '{item_name}'! Exiting.") | 
        
          |  | sys.exit() | 
        
          |  |  | 
        
          |  | return new_item_id | 
        
          |  |  | 
        
          |  | def getOrCreateItemIDByName(item_name: str, route: str, session: httpx.Client, timeout: float): | 
        
          |  | # Check for existing item ID | 
        
          |  | existing_id = getItemIDByName(item_name, route, session, timeout) | 
        
          |  |  | 
        
          |  | # If no existing ID found, create | 
        
          |  | if existing_id == None: | 
        
          |  | print(f"No item found with name: '{item_name}'; creating...") | 
        
          |  | existing_id = createItemByName(item_name, route, session, timeout, skip_existing_check = True) | 
        
          |  |  | 
        
          |  | return existing_id | 
        
          |  |  | 
        
          |  | def _get_resp_data(route: str, session: httpx.Client, timeout: float): | 
        
          |  | response = session.get(route, timeout = SESSION_TIMEOUT) | 
        
          |  | response.raise_for_status() | 
        
          |  | response_data = response.json() | 
        
          |  |  | 
        
          |  | return response_data | 
        
          |  |  | 
        
          |  | def _set_auth_tokens(paperless_url: str, session: httpx.Client, timeout: float): | 
        
          |  | response = session.get(paperless_url, timeout = timeout, follow_redirects = True) | 
        
          |  | response.raise_for_status() | 
        
          |  |  | 
        
          |  | csrf_token = response.cookies["csrftoken"] | 
        
          |  |  | 
        
          |  | session.headers.update( | 
        
          |  | {"Authorization": f"Token {API_AUTH_TOKEN}", f"X-CSRFToken": csrf_token} | 
        
          |  | ) | 
        
          |  |  | 
        
          |  | ####################################################### | 
        
          |  | # Main | 
        
          |  | ####################################################### | 
        
          |  |  | 
        
          |  | if __name__ == "__main__": | 
        
          |  | # Running inside the Docker container | 
        
          |  | with httpx.Client() as sess: | 
        
          |  | # Set tokens for the appropriate header auth | 
        
          |  | _set_auth_tokens(PAPERLESS_URL, sess, SESSION_TIMEOUT) | 
        
          |  |  | 
        
          |  | # Get the PK as provided via post-consume | 
        
          |  | doc_pk = int(os.environ["DOCUMENT_ID"]) | 
        
          |  |  | 
        
          |  | # Query the API for the document info | 
        
          |  | document_api_route = f"{PAPERLESS_URL}/api/documents/{doc_pk}/" | 
        
          |  | doc_info = _get_resp_data(document_api_route, sess, SESSION_TIMEOUT) | 
        
          |  |  | 
        
          |  | # Extract the currently assigned values | 
        
          |  | doc_title = doc_info["title"] | 
        
          |  | print(f"Post-processing input file: '{doc_title}'...") | 
        
          |  |  | 
        
          |  | # parse file name for date_created, correspondent and title for the document: | 
        
          |  | extracted_date, extracted_correspondent, extracted_title = parseFileName(doc_title) | 
        
          |  |  | 
        
          |  | # Clean up title formatting | 
        
          |  | new_doc_title = extracted_title.replace("_", " ") | 
        
          |  |  | 
        
          |  | # Get correspondent ID | 
        
          |  | correspondent_api_route = f"{PAPERLESS_URL}/api/correspondents/" | 
        
          |  | correspondent_id = getOrCreateItemIDByName(extracted_correspondent, correspondent_api_route, sess, SESSION_TIMEOUT) | 
        
          |  |  | 
        
          |  | data = { | 
        
          |  | "title": new_doc_title, | 
        
          |  | "correspondent": correspondent_id, | 
        
          |  | "created_date": extracted_date | 
        
          |  | } | 
        
          |  |  | 
        
          |  | # Conditionally add a tag to the document. | 
        
          |  | doc_tags = doc_info["tags"] | 
        
          |  | new_doc_tags = doc_tags | 
        
          |  |  | 
        
          |  | if ADD_TAG == True: | 
        
          |  | tags_api_route = f"{PAPERLESS_URL}/api/tags/" | 
        
          |  | tag_id = getOrCreateItemIDByName(MODIFIED_TAG, tags_api_route, sess, SESSION_TIMEOUT) | 
        
          |  |  | 
        
          |  | # Add the new tag to list of current tags | 
        
          |  | new_doc_tags.append(tag_id) | 
        
          |  |  | 
        
          |  | # Set document tags | 
        
          |  | data['tags'] = new_doc_tags | 
        
          |  |  | 
        
          |  | # Print status | 
        
          |  | print("Regexp Matching was successful!") | 
        
          |  | print(f"Date created:  '{extracted_date}'") | 
        
          |  | print(f"Correspondent: '{extracted_correspondent}'") | 
        
          |  | print(f"Title:         '{new_doc_title}'") | 
        
          |  | print(f"Tag IDs:       '{str(new_doc_tags)}'") | 
        
          |  |  | 
        
          |  | # Update the document | 
        
          |  | resp = sess.patch( | 
        
          |  | f"{PAPERLESS_URL}/api/documents/{doc_pk}/", | 
        
          |  | data=data, | 
        
          |  | timeout=SESSION_TIMEOUT, | 
        
          |  | ) | 
        
          |  | resp.raise_for_status() |