Last active
October 15, 2023 21:06
-
-
Save skeptrunedev/3e2ed414104b272f864d80df320a18f2 to your computer and use it in GitHub Desktop.
python chunker for emails and other content that goes line by line
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import io | |
import json | |
import os | |
import re | |
import string | |
import requests | |
import tiktoken | |
import pandas as pd | |
import redis | |
from dotenv import load_dotenv | |
load_dotenv() | |
redis_url = os.environ.get('REDIS_URL') | |
api_key = os.environ.get('API_KEY') | |
api_url = os.environ.get('API_URL') | |
target_sentence_count = int(os.environ.get('TARGET_SENTENCE_COUNT')) | |
min_sentence_count = int(os.environ.get('MIN_SENTENCE_COUNT')) | |
if not redis_url or not api_key or not api_url or not target_sentence_count or not min_sentence_count: | |
print("Missing environment variables.") | |
exit(1) | |
redis_client = redis.StrictRedis.from_url(redis_url) | |
class Card: | |
def __init__(self, card_html, tag_set, metadata_dict): | |
self.card_html = card_html | |
self.tag_set = tag_set | |
self.metadata = metadata_dict | |
if not self.metadata: | |
print("Missing metadata.") | |
exit(1) | |
def to_json(self): | |
def replace_nan_none(obj): | |
if isinstance(obj, float) and (obj != obj or obj is None): | |
return "" | |
if obj is None: | |
return "" | |
if isinstance(obj, dict): | |
return {key: replace_nan_none(value) for key, value in obj.items()} | |
if isinstance(obj, list): | |
return [replace_nan_none(item) for item in obj] | |
return obj | |
json_dict = {key: replace_nan_none(value) for key, value in self.__dict__.items()} | |
return json.dumps(json_dict, sort_keys=True, default=str) | |
def send_post_request(self): | |
if count_sentences(self.card_html) < min_sentence_count: | |
return | |
url = f"{api_url}/card" | |
payload = self.to_json() | |
headers = {"Content-Type": "application/json", "Authorization":api_key} | |
req_result = requests.post(url, data=payload, headers=headers) | |
if req_result.status_code != 200: | |
req_error = req_result.text | |
print(req_error) | |
def count_sentences(input_string): | |
sentences = re.split(r'[.!?]', input_string) | |
sentences = [sentence.strip() for sentence in sentences if sentence.strip()] | |
sentence_count = len(sentences) | |
word_count = count_words(input_string) | |
if sentence_count * 10 > word_count: | |
return word_count // 10 | |
return sentence_count | |
def count_words(input_string): | |
words = input_string.split() | |
return len(words) | |
def has_only_spaces_special_chars_numbers(input_string): | |
stripped_string = input_string.replace(" ", "") | |
special_chars = string.punctuation | |
return all(char.isdigit() or char in special_chars for char in stripped_string) | |
def get_total_token_count(input_strings): | |
total_token_count = 0 | |
for input_string in input_strings: | |
total_token_count += len(encoding.encode(input_string)) | |
return total_token_count | |
def remove_hex_unicodes(input_string): | |
return input_string.encode('ascii', 'ignore').decode('ascii') | |
encoding = tiktoken.get_encoding("cl100k_base") | |
words_to_trigger_line_ignore = ['to:', 'cc:', 'from:', 'date:', "sent:", "forwarded message", "@", "───", "meeting id:", "password:", "all rights reserved", "ext.", "has invited you", "google llc", "shared with you", "...", "confidentiality notice", "information that is protected from disclosure", "you are not the intended recipient", "without disclosing or using", "wrote:", "tel:", "email:", ", ca", "road,", "ct,", "external email", "facsimile/email", "confidential or privileged", "distribution of this fax", "exclusive use of", "violation of federal", "under HIPAA", "for reproduction", "further distribution", "message are private", "unauthorized use", "notify the sender", "unauthorized interception", "outside of the ICR", "recognize the sender's email", "docs.google.com", "external sender", "trust this email", "content is safe", "proof of sender", "do not click", "mentioned in this thread", "google docs sends", "www.google.com", "zoom.us", "disclosure under HIPAA", "solely for use"] | |
words_trigger_email_start = ['to:', 'cc:', 'from:', 'date:', 'sent:', 'subject:', 're:', 'fw:', 'fwd:', 'attachments:', 'attached:', 'wrote:'] | |
words_trigger_email_end = ['forwarded message', 'has invited you', 'open in', 'google llc', 'original message', 'original message follows', '───' '--', '***', '===', 'regards,', 'from,', 'sincerely,', 'yours,', 'regards,', 'gratitude,', 'appreciation,', 'care,', 'cheers,', 'cordially,', 'gratitude,', 'respectfully,', 'warmly,', 'best,', 'wishes,', 'humbly,', "thanks,"] | |
def contains_email_start(line): | |
for word in words_trigger_email_start: | |
if word in line.lower(): | |
return True | |
return False | |
def contains_email_end(line): | |
for word in words_trigger_email_end: | |
if word in line.lower(): | |
return True | |
return False | |
def ignore_line(line): | |
if has_only_spaces_special_chars_numbers(line): | |
return True | |
for word in words_to_trigger_line_ignore: | |
if word in line.lower(): | |
return True | |
phone_pattern = r'\b\d{3}[-. ]?\d{3}[-. ]?\d{4}\b' | |
phone_numbers = re.findall(phone_pattern, line) | |
if len(phone_numbers) > 0: | |
return True | |
return False | |
# Returns True to cutoff, False to continue | |
def default_cutoff_or_continue(cur_card_text, line): | |
cur_sentence_count = count_sentences(cur_card_text) | |
new_line_sentence_count = count_sentences(line) | |
if cur_sentence_count + new_line_sentence_count > target_sentence_count and new_line_sentence_count >= 1: | |
return True | |
if get_total_token_count([cur_card_text, line]) >= 460: | |
return True | |
return False | |
# --- | |
def process_file(full_path, metadata_dict): | |
email_dict = { | |
'email_to': "", | |
'email_cc': "", | |
'email_from': "", | |
'email_sent': "", | |
'email_subject': "", | |
'email_re': "", | |
'email_fw': "", | |
'email_attachments': "" | |
} | |
def clear_email_vars(): | |
nonlocal email_dict | |
for key in email_dict: | |
email_dict[key] = "" | |
def set_email_vars(line): | |
nonlocal email_dict | |
line = line.replace("\n", "") | |
lowercase_line = line.lower() | |
keys = ["to:", "cc:", "from:", "sent:", "date:", "subject:", "re:", "fw:", "fwd:", "attachments:", "attached:"] | |
for key in keys: | |
key_translation = { | |
"to:": "email_to", | |
"cc:": "email_cc", | |
"from:": "email_from", | |
"sent:": "email_sent", | |
"date:": "email_sent", | |
"subject:": "email_subject", | |
"re:": "email_re", | |
"fw:": "email_fw", | |
"fwd:": "email_fw", | |
"attachments:": "email_attachments", | |
"attached:": "email_attachments" | |
} | |
split_line = lowercase_line.split(key) | |
if len(split_line) == 1: | |
continue | |
value_len = len(split_line[1]) | |
value = line[-value_len:].strip() | |
if not value: | |
continue | |
if value != email_dict[key_translation[key]]: | |
email_dict[key_translation[key]] = value if not email_dict[key_translation[key]] else email_dict[key_translation[key]] + "|" + value | |
redis_value = redis_client.get(full_path) | |
if redis_value == b'done': | |
print(f"Skipping {full_path}") | |
return | |
redis_client.set(full_path, "done") | |
with open(full_path, 'r', encoding="latin-1") as file: | |
cur_card_content = "" | |
next_nonsentence = "" | |
is_email = False | |
found_email_start = False | |
for line in file: | |
line = remove_hex_unicodes(line) | |
if contains_email_start(line): | |
if count_sentences(cur_card_content) >= 4: | |
metadata_dict.update(email_dict) | |
card = Card(cur_card_content, "email" if is_email else "", metadata_dict) | |
card.send_post_request() | |
clear_email_vars() | |
set_email_vars(line) | |
is_email = True | |
found_email_start = True | |
cur_card_content = "" | |
next_nonsentence = "" | |
continue | |
line = line.replace("\n", " ") | |
# split line into sentences | |
sentences = re.split(r'[.!?]', line) | |
if len(sentences) == 1: | |
if line != "\n" and not ignore_line(line): | |
next_nonsentence += line | |
continue | |
# set next_nonsentence to the last element of sentences | |
temp_next_nonsentence = sentences[-1] | |
# set line equal to itself minus the last element of sentences | |
line = line[:-len(temp_next_nonsentence)] | |
line = next_nonsentence + line | |
if ignore_line(line): | |
set_email_vars(line) | |
continue | |
# email cases | |
if is_email and not found_email_start: | |
cur_card_content = "" | |
next_nonsentence = "" | |
continue | |
if is_email and contains_email_end(line): | |
next_nonsentence = "" | |
found_email_start = False | |
if count_sentences(cur_card_content) < 4: | |
cur_card_content = "" | |
continue | |
metadata_dict.update(email_dict) | |
card = Card(cur_card_content, "email" if is_email else "", metadata_dict) | |
card.send_post_request() | |
cur_card_content = "" | |
clear_email_vars() | |
continue | |
if is_email: | |
next_nonsentence = temp_next_nonsentence | |
if len(cur_card_content) > 3000 or get_total_token_count([cur_card_content, line]) >= 8191: | |
metadata_dict.update(email_dict) | |
card = Card(cur_card_content, "email" if is_email else "", email_dict) | |
card.send_post_request() | |
cur_card_content = line | |
continue | |
cur_card_content += line | |
continue | |
# non-email cases | |
cutoff = default_cutoff_or_continue(cur_card_content, line) | |
if cutoff: | |
metadata_dict.update(email_dict) | |
card = Card(cur_card_content, "email" if is_email else "", metadata_dict) | |
card.send_post_request() | |
next_nonsentence = temp_next_nonsentence | |
cur_card_content = line | |
continue | |
elif not cutoff: | |
cur_card_content += line | |
next_nonsentence = temp_next_nonsentence | |
continue | |
metadata_dict.update(email_dict) | |
card = Card(cur_card_content, "email" if is_email else "", metadata_dict) | |
card.send_post_request() | |
def main(): | |
parser = argparse.ArgumentParser(description='Process files with an option for full path.') | |
parser.add_argument('--df_path', help='File path to a dataframe') | |
args = parser.parse_args() | |
if args.df_path is None: | |
print("Missing dataframe path.") | |
exit(1) | |
df_path = args.df_path | |
with open(df_path, 'r', encoding='utf-8') as file: | |
file_content = file.read().replace('þ', '') | |
text_buffer = io.StringIO(file_content) | |
delimiter = r'' | |
df = pd.read_csv(text_buffer, delimiter=delimiter, engine="python") | |
for _, row in df.iterrows(): | |
metadata_dict = row.to_dict() | |
file_path = metadata_dict["Extracted Text Link"].replace("\\", "/") | |
process_file(file_path, metadata_dict) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment