Skip to content

Instantly share code, notes, and snippets.

@grahama1970
Created October 12, 2024 17:58
Show Gist options
  • Save grahama1970/142fc8bc1e0206ce51a45658d17cabe9 to your computer and use it in GitHub Desktop.
Save grahama1970/142fc8bc1e0206ce51a45658d17cabe9 to your computer and use it in GitHub Desktop.
text_normalizer
import regex as re
from typing import Dict, Optional
import unicodedata
import html
from dateutil.parser import parse as date_parser
from better_profanity import profanity
from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
import warnings
import emoji
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from nltk import pos_tag
from langdetect import detect, LangDetectException
from loguru import logger
from utils.text_normalizer import TextNormalizerConfig
from verifaix.utils.loguru_setup import setup_logger
from verifaix.utils.regex_patterns.normalize_text_with_replacements import get_replacements
from verifaix.utils.regex_patterns.get_patterns import get_patterns
# Suppress the MarkupResemblesLocatorWarning if it's not relevant
warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)
# Initialize logger
setup_logger()
# from the regex_patterns directory
regex_replacements = get_replacements()
regex_patterns = get_patterns()
###
# Cleaning Utils
###
def remove_html_tags(text: str) -> str:
logger.debug(f"Removing HTML tags. Input: '{text}'")
# Simple check to detect paths or dates that might resemble a locator
if re.match(r'\d{1,2}/\d{1,2}/\d{4}', text):
logger.debug(f"Skipping HTML parsing for date-like or path-like text: {text}")
return text # Return as-is if it's a date-like string
soup = BeautifulSoup(text, 'html.parser')
result = soup.get_text()
logger.debug(f"After removing HTML tags: '{result}'")
return result
def normalize_unicode(text: str) -> str:
logger.debug("Normalizing Unicode characters.")
return unicodedata.normalize('NFKC', text)
def filter_profanity(text: str) -> str:
# Initialize profanity filter
profanity.load_censor_words()
logger.debug(f"Filtering profanity. Input: '{text}'")
# Define a custom censor that handles words and punctuation separately
def custom_censor(word):
if word == '[REMOVED]':
return word
# Match word with possible punctuation at the end
match = re.match(r'(\w+)(\W*)', word)
if match:
base_word = match.group(1) # The alphabetic word part
punctuation = match.group(2) # Any punctuation following the word
# If the base word is profane, censor it
if profanity.contains_profanity(base_word):
return base_word[0] + '*' * (len(base_word) - 1) + punctuation
return word
# Split the text into words and apply the censor
words = text.split()
censored_words = [custom_censor(word) for word in words]
result = ' '.join(censored_words)
logger.debug(f"After filtering profanity: '{result}'")
return result
def remove_special_characters_old(text: str, preserve: Optional[str] = None) -> str:
logger.debug(f"Removing special characters, preserving: {preserve}")
if preserve is None:
preserve = "-_'"
special_chars_pattern = re.compile(f'[^\w\s{re.escape(preserve)}]', re.UNICODE)
text = special_chars_pattern.sub('', text)
logger.debug(f"After removing special characters (except {preserve}): '{text}'")
return text
def remove_invalid_characters(text: str) -> str:
logger.debug("Removing non-displayable characters.")
# This pattern matches control characters that are non-printable or invalid (control ranges)
# \u0000-\u001F and \u007F-\u009F target control characters while preserving all valid Unicode characters.
invalid_chars_pattern = re.compile(r'[\u0000-\u001F\u007F-\u009F]+')
# Remove invalid characters, keeping valid Unicode and ASCII characters intact
cleaned_text = invalid_chars_pattern.sub('', text)
logger.debug(f"After removing invalid characters: '{cleaned_text}'")
return cleaned_text
def normalize_whitespace(text: str) -> str:
logger.debug("Normalizing whitespace.")
result = re.sub(r'\s+', ' ', text).rstrip()
logger.debug(f"After normalizing whitespace: '{result}'")
return result
def lemmatize(text: str) -> str:
lemmatizer = WordNetLemmatizer()
logger.debug(f"Lemmatizing text. Input: '{text}'")
words = word_tokenize(text)
tagged_words = pos_tag(words)
lemmatized_words = []
for word, tag in tagged_words:
if word == '[REMOVED]':
lemmatized_words.append(word)
continue
if tag.startswith('NN'):
pos = 'n'
elif tag.startswith('VB'):
pos = 'v'
elif tag.startswith('JJ'):
pos = 'a'
elif tag.startswith('RB'):
pos = 'r'
else:
pos = 'n'
lemma = lemmatizer.lemmatize(word, pos=pos)
lemmatized_words.append(lemma)
result = ' '.join(lemmatized_words)
logger.debug(f"After lemmatization: '{result}'")
return result
def expand_contractions(text: str) -> str:
logger.debug("Expanding contractions.")
contractions = regex_replacements.get('contractions', {})
# Function to preserve the original case of the contraction
def case_preserving_replace(match):
contraction = match.group(0)
expansion = contractions[contraction.lower()] # Get the lowercase contraction
# Check the case of the contraction and apply it to the expansion
if contraction.isupper():
return expansion.upper()
elif contraction[0].isupper():
return expansion.capitalize()
else:
return expansion.lower()
# Function to replace contractions in a single piece of text
def replace_contractions(text_part: str) -> str:
for contraction in contractions:
# Use case-preserving replacement
text_part = re.sub(rf'\b{re.escape(contraction)}\b', case_preserving_replace, text_part, flags=re.IGNORECASE)
return text_part
# Now use a regex split to separate [REMOVED] parts from other text
segments = re.split(r'(\[REMOVED\])', text)
# Apply contraction expansion only to non-[REMOVED] segments
for i in range(len(segments)):
if segments[i] != '[REMOVED]':
segments[i] = replace_contractions(segments[i])
# Rejoin all segments back together
expanded_text = ''.join(segments)
logger.debug(f"After expanding contractions: '{expanded_text}'")
return expanded_text
def remove_duplicate_lines(text: str) -> str:
logger.debug("Removing duplicate lines.")
lines = text.split('\n')
unique_lines = []
for line in lines:
stripped_line = line.rstrip()
if stripped_line and stripped_line not in unique_lines:
unique_lines.append(stripped_line)
result = '\n'.join(unique_lines)
logger.debug(f"After removing duplicate lines: '{result}'")
return result
def replace_unicode_and_bullet_points(text: str) -> str:
logger.debug("Replacing unicode characters and bullet points.")
# Combine unicode and bullet point replacements
replacements = {**regex_replacements['unicode_replacements'],
**regex_replacements['bullet_point_replacements']}
# Compile a regex pattern to match bullet points or unicode at the start of lines (with optional spaces/tabs)
pattern = re.compile(r'^[ \t]*(' + '|'.join(map(re.escape, replacements.keys())) + ')', re.MULTILINE)
# Function to perform the replacement using regex
def replacement_function(match):
unicode_char = match.group(1) # Match the bullet point or unicode char
replacement = replacements.get(unicode_char, unicode_char) # Default to the same char if no replacement found
logger.debug(f"Replacing {repr(unicode_char)} with {repr(replacement)}")
return match.group(0).replace(unicode_char, replacement) # Replace only the matched char
# Perform the regex substitution on the entire text
result = pattern.sub(replacement_function, text)
return result
def remove_control_characters(text):
"""Helper function to remove non-visible control characters."""
return ''.join(ch for ch in text if ch.isprintable())
def standardize_dates(text: str) -> str:
logger.debug("Standardizing dates.")
patterns = regex_patterns.get_patterns()
return re.sub(patterns['dates'][0], lambda x: date_parser(x.group(0)).strftime('%Y-%m-%d'), text)
def tokenize_urls_emails(text: str) -> str:
logger.debug(f"Tokenizing URLs and emails. Input: '{text}'")
patterns = regex_patterns
trailing_punctuation = ''
if text[-1] in '.!?':
trailing_punctuation = text[-1]
text = text[:-1]
clean_text = re.sub(f"\\[REMOVED\\](*SKIP)(*FAIL)|{patterns['url_email_tokenization'][0]}",
lambda x: '[EMAIL]' + x.group(2) if x.group(0) != '[REMOVED]' else x.group(0), text)
clean_text = re.sub(f"\\[REMOVED\\](*SKIP)(*FAIL)|{patterns['url_email_tokenization'][1]}",
lambda x: '[URL]' + x.group(2) if x.group(0) != '[REMOVED]' else x.group(0), clean_text)
clean_text += trailing_punctuation
logger.debug(f"After tokenizing URLs and emails: '{clean_text}'")
return clean_text
def standardize_numbers(text: str) -> str:
logger.debug("Standardizing numbers.")
return re.sub(
f"\\[REMOVED\\](*SKIP)(*FAIL)|{regex_patterns['numbers'][0]}",
lambda x: f"{x.group(1)}{x.group(2)}" if x.group(0) != '[REMOVED]'
else x.group(0), text
)
def handle_emojis(text: str) -> str:
logger.debug(f"Handling emojis. Input: '{text}'")
def replace_emoji(char):
if char == '[REMOVED]':
return '[REMOVED]'
if emoji.is_emoji(char):
return f" {emoji.demojize(char)} "
return char
result = ''.join(replace_emoji(char) for char in text)
result = re.sub(r'\s+', ' ', result).rstrip()
return result
def correct_spelling(text: str, spell_checker) -> str:
logger.debug("Correcting spelling.")
tokens = text.split()
corrected_tokens = [spell_checker.correction(token) for token in tokens]
return ' '.join(corrected_tokens)
def standardize_punctuation(text: str) -> str:
logger.debug("Standardizing punctuation.")
replacements = regex_replacements.get('standardize_punctuation', {})
for old, new in replacements.items():
text = text.replace(old, new)
return re.sub(r'\s([?.!,"](?:\s|$))', r'\1', text)
def remove_accents_and_diacritics(text: str) -> str:
logger.debug("Removing accents and diacritics.")
return ''.join((c for c in unicodedata.normalize('NFD', text)
if unicodedata.category(c) != 'Mn'))
def normalize_math(text: str) -> str:
logger.debug(f"Normalizing math formulas. Input: '{text}'")
replacements = regex_replacements.get('latex_replacements', {})
for latex, symbol in replacements.items():
latex_pattern = latex.replace('\\\\', '\\')
text = re.sub(r'(?<!\\)' + re.escape(latex_pattern), symbol, text)
text = text.replace('∑', 'sum')
text = text.replace('∫', 'integral')
text = re.sub(r'_(\w)', r' subscript \1', text)
text = re.sub(r'\^(\w)', r' superscript \1', text)
text = text.replace('$', '')
logger.debug(f"After normalizing math: '{text}'")
return text
def strip_leading_spaces(text: str) -> str:
logger.debug("Stripping leading spaces from the text.")
lines = text.split('\n')
stripped_lines = [line.lstrip() for line in lines]
stripped_text = '\n'.join(stripped_lines)
return stripped_text
def _handle_text_in_brackets(text: str) -> str:
def replacer(match):
return match.group(0).replace(' ', '⚙️').replace('\t', '⚙️')
text = re.sub(r'\[REMOVED\](*SKIP)(*FAIL)|\[.*?\]|\{.*?\}', replacer, text)
text = text.replace('⚙️', ' ')
return text
def detect_language(text: str) -> str:
logger.debug("Detecting language.")
try:
return detect(text)
except LangDetectException as e:
logger.warning(f"Language detection failed: {e}")
return 'en'
def handle_scientific_notation(text: str) -> str:
logger.debug("Handling scientific notation.")
return re.sub(
regex_patterns['scientific_notation'][0],
regex_patterns.replace_scientific_notation,
text
)
def replace_html_entities(text: str) -> str:
logger.debug("Replacing HTML entities.")
return html.unescape(text)
def remove_directional_formatting(text: str) -> str:
"""
Removes all Unicode directional formatting characters from the input text using regex.
Args:
text (str): The input string from which directional formatting characters should be removed.
Returns:
str: The cleaned string with directional formatting characters removed.
"""
# Regex pattern for all directional formatting characters
directional_pattern = r'[\u200E\u200F\u202A\u202B\u202C\u202D\u202E\u2066\u2067\u2068\u2069]'
# Substitute all matches with an empty string
return re.sub(directional_pattern, '', text)
###
# Main Cleaning Pipeline
###
def normalize(text: str, config: TextNormalizerConfig) -> str:
logger.debug(f"Original text: {text}")
# Define the normalization steps along with their function references
normalization_steps = [
('remove_duplicate_lines', remove_duplicate_lines),
('normalize_math', normalize_math),
('remove_html_tags', remove_html_tags),
('remove_control_characters', remove_control_characters),
('handle_brackets', _handle_text_in_brackets),
('replace_unicode_and_bullet_points', replace_unicode_and_bullet_points),
('remove_special_characters', remove_invalid_characters),
('handle_emojis', handle_emojis),
('normalize_unicode', normalize_unicode),
('normalize_whitespace', normalize_whitespace),
('expand_contractions', expand_contractions),
('filter_profanity', filter_profanity),
]
# Step 1: Capture leading spaces or tabs if strip_lspace is disabled
lines = text.split('\n')
leading_whitespace_map = []
for line in lines:
leading_whitespace = re.match(r'^[ \t]*', line).group(0) # Capture leading spaces/tabs
leading_whitespace_map.append(leading_whitespace) # Store for later restoration
# Remove non-ASCII characters
def remove_unwanted_unicode(text: str) -> str:
logger.debug("Removing unwanted Unicode characters")
return re.sub(r'[^\x00-\x7F]+', '', text)
# Apply Unicode removal first to ensure all characters are clean
text = remove_unwanted_unicode(text)
# Step 2: Normalize the text (excluding the leading spaces/tabs)
normalized_lines = []
for i, line in enumerate(lines):
normalized_line = line.rstrip() # Remove trailing whitespace for normalization
# Apply each step of normalization based on config settings
for step_name, step_function in normalization_steps:
if config.settings.get(step_name, False): # Dynamically apply based on config settings
logger.debug(f"Applying {step_name}")
normalized_line = step_function(normalized_line)
normalized_lines.append(normalized_line)
# Step 3: Restore or remove leading spaces/tabs based on strip_lspace config
restored_text = ""
for i, normalized_line in enumerate(normalized_lines):
if config.strip_lspace:
restored_text += normalized_line + '\n' # Do not add leading spaces
else:
restored_text += leading_whitespace_map[i] + normalized_line + '\n' # Restore leading spaces
logger.debug(f"Final normalized text: {restored_text}")
return restored_text.strip() # Ensure no trailing newlines
##
# Usage
##
def advanced_usage():
from verifaix.utils.text_normalizer.main import normalize_text
# text = "Advanced text with 🥳 emojis and bad words like hell."
config = TextNormalizerConfig(settings_type="advanced")
text = (
"There'd be Sample text 🥳 emojis with <b>HTML</b> and profanity like damn "
"and \\alpha."
)
text = (
"This is a 12/23/2005 'Sample' text fuck with can't \n"
"Some HTML like <b>bold</b> and <i>italic</i> with profanity like damn."
)
text = "● input is tied <b>Ack?!</b> to 0 debug_mode_i"
print(normalize_text(text, config))
# replace_unicode_and_bullet_points(text)
return
normalized_text = normalize_text(text, config)
print(normalized_text)
if __name__ == "__main__":
# text = "Ain't that a good thing going"
# print(expand_contractions(text))
advanced_usage()
from verifaix.utils.text_normalizer.cleaning_utils import normalize
from verifaix.utils.text_normalizer.text_normalizer_config import TextNormalizerConfig
from loguru import logger
from verifaix.utils.loguru_setup import setup_logger
setup_logger()
def normalize_text(text: str, config: TextNormalizerConfig) -> str:
logger.debug(f"Normalizing text with settings: {config.settings}")
if isinstance(text, (type(None), list, dict)) or text.strip() == '':
logger.warning("Received None for text, returning an empty string.")
return ""
# Use the cleaning pipeline by calling `normalize` from cleaning_utils.py
normalized_text = normalize(text, config)
return normalized_text
# Example usage functions
def basic_usage():
text = "Sample text with <b>HTML</b> and profanity like damn."
config = TextNormalizerConfig(settings_type="basic")
normalized_text = normalize_text(text, config)
print("Basic Settings Output:")
print(normalized_text)
def advanced_usage():
text = "\u202d Advanced text with 🥳 emojis and \u202d bad words like hell and [Things to do {well}]."
text = "'● input is tied to 0 debug_mode_i"
config = TextNormalizerConfig(settings_type="advanced")
normalized_text = normalize_text(text, config)
print("Advanced Settings Output:")
print(normalized_text)
def custom_usage():
custom_settings = {
"remove_html_tags": True,
"filter_profanity": True,
"remove_special_characters": True,
"normalize_whitespace": True
}
text = (
"This is a 12/23/2005 'Sample' text fuck with can't \n"
"Some HTML like <b>bold</b> and <i>italic</i> with profanity like damn."
)
config = TextNormalizerConfig(settings_type="custom", custom_settings=custom_settings)
normalized_text = normalize_text(text, config)
print("Custom Settings Output:")
print(normalized_text)
# Run the example usage
if __name__ == "__main__":
# basic_usage()
# print("-----------------------")
advanced_usage()
# print("-----------------------")
# custom_usage()
from pydantic import BaseModel
from typing import Optional, Dict
class TextNormalizerConfig(BaseModel):
# Properties to control different normalization options
lowercase: bool = False
lemmatization: bool = True
tokenize_urls_emails: bool = False
handle_brackets: bool = False
settings_type: str = "custom" # Can be "basic", "advanced", or "custom"
custom_settings: Optional[Dict[str, bool]] = None
strip_lspace: bool = False # Option to control whether leading spaces are stripped
basic_settings: Dict[str, bool] = {
'remove_html_tags': True,
'normalize_whitespace': True,
'remove_special_characters': False,
}
advanced_settings: Dict[str, bool] = {
# 'remove_html_tags': True,
'replace_unicode_and_bullet_points': True,
# 'remove_control_characters': True, # removes all non-printable characters (too )
'normalize_unicode': True,
'filter_profanity': True,
'remove_special_characters': True,
'normalize_whitespace': True,
'lowercase': False,
'lemmatization': True,
'tokenize_urls_emails': False,
'handle_brackets': False,
}
@property
def settings(self) -> Dict[str, bool]:
if self.settings_type == "basic":
return self.basic_settings
elif self.settings_type == "advanced":
return self.advanced_settings
elif self.settings_type == "custom" and self.custom_settings:
# Start with basic and update with custom settings
custom_settings = self.basic_settings.copy()
custom_settings.update(self.custom_settings)
return custom_settings
else:
return self.basic_settings
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment