Skip to content

Instantly share code, notes, and snippets.

@skeptrunedev
Last active October 15, 2023 21:06
Show Gist options
  • Save skeptrunedev/3e2ed414104b272f864d80df320a18f2 to your computer and use it in GitHub Desktop.
Save skeptrunedev/3e2ed414104b272f864d80df320a18f2 to your computer and use it in GitHub Desktop.
python chunker for emails and other content that goes line by line
import argparse
import io
import json
import os
import re
import string
import requests
import tiktoken
import pandas as pd
import redis
from dotenv import load_dotenv
load_dotenv()
redis_url = os.environ.get('REDIS_URL')
api_key = os.environ.get('API_KEY')
api_url = os.environ.get('API_URL')
target_sentence_count = int(os.environ.get('TARGET_SENTENCE_COUNT'))
min_sentence_count = int(os.environ.get('MIN_SENTENCE_COUNT'))
if not redis_url or not api_key or not api_url or not target_sentence_count or not min_sentence_count:
print("Missing environment variables.")
exit(1)
redis_client = redis.StrictRedis.from_url(redis_url)
class Card:
def __init__(self, card_html, tag_set, metadata_dict):
self.card_html = card_html
self.tag_set = tag_set
self.metadata = metadata_dict
if not self.metadata:
print("Missing metadata.")
exit(1)
def to_json(self):
def replace_nan_none(obj):
if isinstance(obj, float) and (obj != obj or obj is None):
return ""
if obj is None:
return ""
if isinstance(obj, dict):
return {key: replace_nan_none(value) for key, value in obj.items()}
if isinstance(obj, list):
return [replace_nan_none(item) for item in obj]
return obj
json_dict = {key: replace_nan_none(value) for key, value in self.__dict__.items()}
return json.dumps(json_dict, sort_keys=True, default=str)
def send_post_request(self):
if count_sentences(self.card_html) < min_sentence_count:
return
url = f"{api_url}/card"
payload = self.to_json()
headers = {"Content-Type": "application/json", "Authorization":api_key}
req_result = requests.post(url, data=payload, headers=headers)
if req_result.status_code != 200:
req_error = req_result.text
print(req_error)
def count_sentences(input_string):
sentences = re.split(r'[.!?]', input_string)
sentences = [sentence.strip() for sentence in sentences if sentence.strip()]
sentence_count = len(sentences)
word_count = count_words(input_string)
if sentence_count * 10 > word_count:
return word_count // 10
return sentence_count
def count_words(input_string):
words = input_string.split()
return len(words)
def has_only_spaces_special_chars_numbers(input_string):
stripped_string = input_string.replace(" ", "")
special_chars = string.punctuation
return all(char.isdigit() or char in special_chars for char in stripped_string)
def get_total_token_count(input_strings):
total_token_count = 0
for input_string in input_strings:
total_token_count += len(encoding.encode(input_string))
return total_token_count
def remove_hex_unicodes(input_string):
return input_string.encode('ascii', 'ignore').decode('ascii')
encoding = tiktoken.get_encoding("cl100k_base")
words_to_trigger_line_ignore = ['to:', 'cc:', 'from:', 'date:', "sent:", "forwarded message", "@", "───", "meeting id:", "password:", "all rights reserved", "ext.", "has invited you", "google llc", "shared with you", "...", "confidentiality notice", "information that is protected from disclosure", "you are not the intended recipient", "without disclosing or using", "wrote:", "tel:", "email:", ", ca", "road,", "ct,", "external email", "facsimile/email", "confidential or privileged", "distribution of this fax", "exclusive use of", "violation of federal", "under HIPAA", "for reproduction", "further distribution", "message are private", "unauthorized use", "notify the sender", "unauthorized interception", "outside of the ICR", "recognize the sender's email", "docs.google.com", "external sender", "trust this email", "content is safe", "proof of sender", "do not click", "mentioned in this thread", "google docs sends", "www.google.com", "zoom.us", "disclosure under HIPAA", "solely for use"]
words_trigger_email_start = ['to:', 'cc:', 'from:', 'date:', 'sent:', 'subject:', 're:', 'fw:', 'fwd:', 'attachments:', 'attached:', 'wrote:']
words_trigger_email_end = ['forwarded message', 'has invited you', 'open in', 'google llc', 'original message', 'original message follows', '───' '--', '***', '===', 'regards,', 'from,', 'sincerely,', 'yours,', 'regards,', 'gratitude,', 'appreciation,', 'care,', 'cheers,', 'cordially,', 'gratitude,', 'respectfully,', 'warmly,', 'best,', 'wishes,', 'humbly,', "thanks,"]
def contains_email_start(line):
for word in words_trigger_email_start:
if word in line.lower():
return True
return False
def contains_email_end(line):
for word in words_trigger_email_end:
if word in line.lower():
return True
return False
def ignore_line(line):
if has_only_spaces_special_chars_numbers(line):
return True
for word in words_to_trigger_line_ignore:
if word in line.lower():
return True
phone_pattern = r'\b\d{3}[-. ]?\d{3}[-. ]?\d{4}\b'
phone_numbers = re.findall(phone_pattern, line)
if len(phone_numbers) > 0:
return True
return False
# Returns True to cutoff, False to continue
def default_cutoff_or_continue(cur_card_text, line):
cur_sentence_count = count_sentences(cur_card_text)
new_line_sentence_count = count_sentences(line)
if cur_sentence_count + new_line_sentence_count > target_sentence_count and new_line_sentence_count >= 1:
return True
if get_total_token_count([cur_card_text, line]) >= 460:
return True
return False
# ---
def process_file(full_path, metadata_dict):
email_dict = {
'email_to': "",
'email_cc': "",
'email_from': "",
'email_sent': "",
'email_subject': "",
'email_re': "",
'email_fw': "",
'email_attachments': ""
}
def clear_email_vars():
nonlocal email_dict
for key in email_dict:
email_dict[key] = ""
def set_email_vars(line):
nonlocal email_dict
line = line.replace("\n", "")
lowercase_line = line.lower()
keys = ["to:", "cc:", "from:", "sent:", "date:", "subject:", "re:", "fw:", "fwd:", "attachments:", "attached:"]
for key in keys:
key_translation = {
"to:": "email_to",
"cc:": "email_cc",
"from:": "email_from",
"sent:": "email_sent",
"date:": "email_sent",
"subject:": "email_subject",
"re:": "email_re",
"fw:": "email_fw",
"fwd:": "email_fw",
"attachments:": "email_attachments",
"attached:": "email_attachments"
}
split_line = lowercase_line.split(key)
if len(split_line) == 1:
continue
value_len = len(split_line[1])
value = line[-value_len:].strip()
if not value:
continue
if value != email_dict[key_translation[key]]:
email_dict[key_translation[key]] = value if not email_dict[key_translation[key]] else email_dict[key_translation[key]] + "|" + value
redis_value = redis_client.get(full_path)
if redis_value == b'done':
print(f"Skipping {full_path}")
return
redis_client.set(full_path, "done")
with open(full_path, 'r', encoding="latin-1") as file:
cur_card_content = ""
next_nonsentence = ""
is_email = False
found_email_start = False
for line in file:
line = remove_hex_unicodes(line)
if contains_email_start(line):
if count_sentences(cur_card_content) >= 4:
metadata_dict.update(email_dict)
card = Card(cur_card_content, "email" if is_email else "", metadata_dict)
card.send_post_request()
clear_email_vars()
set_email_vars(line)
is_email = True
found_email_start = True
cur_card_content = ""
next_nonsentence = ""
continue
line = line.replace("\n", " ")
# split line into sentences
sentences = re.split(r'[.!?]', line)
if len(sentences) == 1:
if line != "\n" and not ignore_line(line):
next_nonsentence += line
continue
# set next_nonsentence to the last element of sentences
temp_next_nonsentence = sentences[-1]
# set line equal to itself minus the last element of sentences
line = line[:-len(temp_next_nonsentence)]
line = next_nonsentence + line
if ignore_line(line):
set_email_vars(line)
continue
# email cases
if is_email and not found_email_start:
cur_card_content = ""
next_nonsentence = ""
continue
if is_email and contains_email_end(line):
next_nonsentence = ""
found_email_start = False
if count_sentences(cur_card_content) < 4:
cur_card_content = ""
continue
metadata_dict.update(email_dict)
card = Card(cur_card_content, "email" if is_email else "", metadata_dict)
card.send_post_request()
cur_card_content = ""
clear_email_vars()
continue
if is_email:
next_nonsentence = temp_next_nonsentence
if len(cur_card_content) > 3000 or get_total_token_count([cur_card_content, line]) >= 8191:
metadata_dict.update(email_dict)
card = Card(cur_card_content, "email" if is_email else "", email_dict)
card.send_post_request()
cur_card_content = line
continue
cur_card_content += line
continue
# non-email cases
cutoff = default_cutoff_or_continue(cur_card_content, line)
if cutoff:
metadata_dict.update(email_dict)
card = Card(cur_card_content, "email" if is_email else "", metadata_dict)
card.send_post_request()
next_nonsentence = temp_next_nonsentence
cur_card_content = line
continue
elif not cutoff:
cur_card_content += line
next_nonsentence = temp_next_nonsentence
continue
metadata_dict.update(email_dict)
card = Card(cur_card_content, "email" if is_email else "", metadata_dict)
card.send_post_request()
def main():
parser = argparse.ArgumentParser(description='Process files with an option for full path.')
parser.add_argument('--df_path', help='File path to a dataframe')
args = parser.parse_args()
if args.df_path is None:
print("Missing dataframe path.")
exit(1)
df_path = args.df_path
with open(df_path, 'r', encoding='utf-8') as file:
file_content = file.read().replace('þ', '')
text_buffer = io.StringIO(file_content)
delimiter = r''
df = pd.read_csv(text_buffer, delimiter=delimiter, engine="python")
for _, row in df.iterrows():
metadata_dict = row.to_dict()
file_path = metadata_dict["Extracted Text Link"].replace("\\", "/")
process_file(file_path, metadata_dict)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment