Created
October 12, 2024 17:58
-
-
Save grahama1970/142fc8bc1e0206ce51a45658d17cabe9 to your computer and use it in GitHub Desktop.
text_normalizer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import regex as re | |
| from typing import Dict, Optional | |
| import unicodedata | |
| import html | |
| from dateutil.parser import parse as date_parser | |
| from better_profanity import profanity | |
| from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning | |
| import warnings | |
| import emoji | |
| from nltk.stem import WordNetLemmatizer | |
| from nltk.tokenize import word_tokenize | |
| from nltk import pos_tag | |
| from langdetect import detect, LangDetectException | |
| from loguru import logger | |
| from utils.text_normalizer import TextNormalizerConfig | |
| from verifaix.utils.loguru_setup import setup_logger | |
| from verifaix.utils.regex_patterns.normalize_text_with_replacements import get_replacements | |
| from verifaix.utils.regex_patterns.get_patterns import get_patterns | |
| # Suppress the MarkupResemblesLocatorWarning if it's not relevant | |
| warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning) | |
| # Initialize logger | |
| setup_logger() | |
| # from the regex_patterns directory | |
| regex_replacements = get_replacements() | |
| regex_patterns = get_patterns() | |
| ### | |
| # Cleaning Utils | |
| ### | |
| def remove_html_tags(text: str) -> str: | |
| logger.debug(f"Removing HTML tags. Input: '{text}'") | |
| # Simple check to detect paths or dates that might resemble a locator | |
| if re.match(r'\d{1,2}/\d{1,2}/\d{4}', text): | |
| logger.debug(f"Skipping HTML parsing for date-like or path-like text: {text}") | |
| return text # Return as-is if it's a date-like string | |
| soup = BeautifulSoup(text, 'html.parser') | |
| result = soup.get_text() | |
| logger.debug(f"After removing HTML tags: '{result}'") | |
| return result | |
| def normalize_unicode(text: str) -> str: | |
| logger.debug("Normalizing Unicode characters.") | |
| return unicodedata.normalize('NFKC', text) | |
| def filter_profanity(text: str) -> str: | |
| # Initialize profanity filter | |
| profanity.load_censor_words() | |
| logger.debug(f"Filtering profanity. Input: '{text}'") | |
| # Define a custom censor that handles words and punctuation separately | |
| def custom_censor(word): | |
| if word == '[REMOVED]': | |
| return word | |
| # Match word with possible punctuation at the end | |
| match = re.match(r'(\w+)(\W*)', word) | |
| if match: | |
| base_word = match.group(1) # The alphabetic word part | |
| punctuation = match.group(2) # Any punctuation following the word | |
| # If the base word is profane, censor it | |
| if profanity.contains_profanity(base_word): | |
| return base_word[0] + '*' * (len(base_word) - 1) + punctuation | |
| return word | |
| # Split the text into words and apply the censor | |
| words = text.split() | |
| censored_words = [custom_censor(word) for word in words] | |
| result = ' '.join(censored_words) | |
| logger.debug(f"After filtering profanity: '{result}'") | |
| return result | |
| def remove_special_characters_old(text: str, preserve: Optional[str] = None) -> str: | |
| logger.debug(f"Removing special characters, preserving: {preserve}") | |
| if preserve is None: | |
| preserve = "-_'" | |
| special_chars_pattern = re.compile(f'[^\w\s{re.escape(preserve)}]', re.UNICODE) | |
| text = special_chars_pattern.sub('', text) | |
| logger.debug(f"After removing special characters (except {preserve}): '{text}'") | |
| return text | |
| def remove_invalid_characters(text: str) -> str: | |
| logger.debug("Removing non-displayable characters.") | |
| # This pattern matches control characters that are non-printable or invalid (control ranges) | |
| # \u0000-\u001F and \u007F-\u009F target control characters while preserving all valid Unicode characters. | |
| invalid_chars_pattern = re.compile(r'[\u0000-\u001F\u007F-\u009F]+') | |
| # Remove invalid characters, keeping valid Unicode and ASCII characters intact | |
| cleaned_text = invalid_chars_pattern.sub('', text) | |
| logger.debug(f"After removing invalid characters: '{cleaned_text}'") | |
| return cleaned_text | |
| def normalize_whitespace(text: str) -> str: | |
| logger.debug("Normalizing whitespace.") | |
| result = re.sub(r'\s+', ' ', text).rstrip() | |
| logger.debug(f"After normalizing whitespace: '{result}'") | |
| return result | |
| def lemmatize(text: str) -> str: | |
| lemmatizer = WordNetLemmatizer() | |
| logger.debug(f"Lemmatizing text. Input: '{text}'") | |
| words = word_tokenize(text) | |
| tagged_words = pos_tag(words) | |
| lemmatized_words = [] | |
| for word, tag in tagged_words: | |
| if word == '[REMOVED]': | |
| lemmatized_words.append(word) | |
| continue | |
| if tag.startswith('NN'): | |
| pos = 'n' | |
| elif tag.startswith('VB'): | |
| pos = 'v' | |
| elif tag.startswith('JJ'): | |
| pos = 'a' | |
| elif tag.startswith('RB'): | |
| pos = 'r' | |
| else: | |
| pos = 'n' | |
| lemma = lemmatizer.lemmatize(word, pos=pos) | |
| lemmatized_words.append(lemma) | |
| result = ' '.join(lemmatized_words) | |
| logger.debug(f"After lemmatization: '{result}'") | |
| return result | |
| def expand_contractions(text: str) -> str: | |
| logger.debug("Expanding contractions.") | |
| contractions = regex_replacements.get('contractions', {}) | |
| # Function to preserve the original case of the contraction | |
| def case_preserving_replace(match): | |
| contraction = match.group(0) | |
| expansion = contractions[contraction.lower()] # Get the lowercase contraction | |
| # Check the case of the contraction and apply it to the expansion | |
| if contraction.isupper(): | |
| return expansion.upper() | |
| elif contraction[0].isupper(): | |
| return expansion.capitalize() | |
| else: | |
| return expansion.lower() | |
| # Function to replace contractions in a single piece of text | |
| def replace_contractions(text_part: str) -> str: | |
| for contraction in contractions: | |
| # Use case-preserving replacement | |
| text_part = re.sub(rf'\b{re.escape(contraction)}\b', case_preserving_replace, text_part, flags=re.IGNORECASE) | |
| return text_part | |
| # Now use a regex split to separate [REMOVED] parts from other text | |
| segments = re.split(r'(\[REMOVED\])', text) | |
| # Apply contraction expansion only to non-[REMOVED] segments | |
| for i in range(len(segments)): | |
| if segments[i] != '[REMOVED]': | |
| segments[i] = replace_contractions(segments[i]) | |
| # Rejoin all segments back together | |
| expanded_text = ''.join(segments) | |
| logger.debug(f"After expanding contractions: '{expanded_text}'") | |
| return expanded_text | |
| def remove_duplicate_lines(text: str) -> str: | |
| logger.debug("Removing duplicate lines.") | |
| lines = text.split('\n') | |
| unique_lines = [] | |
| for line in lines: | |
| stripped_line = line.rstrip() | |
| if stripped_line and stripped_line not in unique_lines: | |
| unique_lines.append(stripped_line) | |
| result = '\n'.join(unique_lines) | |
| logger.debug(f"After removing duplicate lines: '{result}'") | |
| return result | |
| def replace_unicode_and_bullet_points(text: str) -> str: | |
| logger.debug("Replacing unicode characters and bullet points.") | |
| # Combine unicode and bullet point replacements | |
| replacements = {**regex_replacements['unicode_replacements'], | |
| **regex_replacements['bullet_point_replacements']} | |
| # Compile a regex pattern to match bullet points or unicode at the start of lines (with optional spaces/tabs) | |
| pattern = re.compile(r'^[ \t]*(' + '|'.join(map(re.escape, replacements.keys())) + ')', re.MULTILINE) | |
| # Function to perform the replacement using regex | |
| def replacement_function(match): | |
| unicode_char = match.group(1) # Match the bullet point or unicode char | |
| replacement = replacements.get(unicode_char, unicode_char) # Default to the same char if no replacement found | |
| logger.debug(f"Replacing {repr(unicode_char)} with {repr(replacement)}") | |
| return match.group(0).replace(unicode_char, replacement) # Replace only the matched char | |
| # Perform the regex substitution on the entire text | |
| result = pattern.sub(replacement_function, text) | |
| return result | |
| def remove_control_characters(text): | |
| """Helper function to remove non-visible control characters.""" | |
| return ''.join(ch for ch in text if ch.isprintable()) | |
| def standardize_dates(text: str) -> str: | |
| logger.debug("Standardizing dates.") | |
| patterns = regex_patterns.get_patterns() | |
| return re.sub(patterns['dates'][0], lambda x: date_parser(x.group(0)).strftime('%Y-%m-%d'), text) | |
| def tokenize_urls_emails(text: str) -> str: | |
| logger.debug(f"Tokenizing URLs and emails. Input: '{text}'") | |
| patterns = regex_patterns | |
| trailing_punctuation = '' | |
| if text[-1] in '.!?': | |
| trailing_punctuation = text[-1] | |
| text = text[:-1] | |
| clean_text = re.sub(f"\\[REMOVED\\](*SKIP)(*FAIL)|{patterns['url_email_tokenization'][0]}", | |
| lambda x: '[EMAIL]' + x.group(2) if x.group(0) != '[REMOVED]' else x.group(0), text) | |
| clean_text = re.sub(f"\\[REMOVED\\](*SKIP)(*FAIL)|{patterns['url_email_tokenization'][1]}", | |
| lambda x: '[URL]' + x.group(2) if x.group(0) != '[REMOVED]' else x.group(0), clean_text) | |
| clean_text += trailing_punctuation | |
| logger.debug(f"After tokenizing URLs and emails: '{clean_text}'") | |
| return clean_text | |
| def standardize_numbers(text: str) -> str: | |
| logger.debug("Standardizing numbers.") | |
| return re.sub( | |
| f"\\[REMOVED\\](*SKIP)(*FAIL)|{regex_patterns['numbers'][0]}", | |
| lambda x: f"{x.group(1)}{x.group(2)}" if x.group(0) != '[REMOVED]' | |
| else x.group(0), text | |
| ) | |
| def handle_emojis(text: str) -> str: | |
| logger.debug(f"Handling emojis. Input: '{text}'") | |
| def replace_emoji(char): | |
| if char == '[REMOVED]': | |
| return '[REMOVED]' | |
| if emoji.is_emoji(char): | |
| return f" {emoji.demojize(char)} " | |
| return char | |
| result = ''.join(replace_emoji(char) for char in text) | |
| result = re.sub(r'\s+', ' ', result).rstrip() | |
| return result | |
| def correct_spelling(text: str, spell_checker) -> str: | |
| logger.debug("Correcting spelling.") | |
| tokens = text.split() | |
| corrected_tokens = [spell_checker.correction(token) for token in tokens] | |
| return ' '.join(corrected_tokens) | |
| def standardize_punctuation(text: str) -> str: | |
| logger.debug("Standardizing punctuation.") | |
| replacements = regex_replacements.get('standardize_punctuation', {}) | |
| for old, new in replacements.items(): | |
| text = text.replace(old, new) | |
| return re.sub(r'\s([?.!,"](?:\s|$))', r'\1', text) | |
| def remove_accents_and_diacritics(text: str) -> str: | |
| logger.debug("Removing accents and diacritics.") | |
| return ''.join((c for c in unicodedata.normalize('NFD', text) | |
| if unicodedata.category(c) != 'Mn')) | |
| def normalize_math(text: str) -> str: | |
| logger.debug(f"Normalizing math formulas. Input: '{text}'") | |
| replacements = regex_replacements.get('latex_replacements', {}) | |
| for latex, symbol in replacements.items(): | |
| latex_pattern = latex.replace('\\\\', '\\') | |
| text = re.sub(r'(?<!\\)' + re.escape(latex_pattern), symbol, text) | |
| text = text.replace('∑', 'sum') | |
| text = text.replace('∫', 'integral') | |
| text = re.sub(r'_(\w)', r' subscript \1', text) | |
| text = re.sub(r'\^(\w)', r' superscript \1', text) | |
| text = text.replace('$', '') | |
| logger.debug(f"After normalizing math: '{text}'") | |
| return text | |
| def strip_leading_spaces(text: str) -> str: | |
| logger.debug("Stripping leading spaces from the text.") | |
| lines = text.split('\n') | |
| stripped_lines = [line.lstrip() for line in lines] | |
| stripped_text = '\n'.join(stripped_lines) | |
| return stripped_text | |
| def _handle_text_in_brackets(text: str) -> str: | |
| def replacer(match): | |
| return match.group(0).replace(' ', '⚙️').replace('\t', '⚙️') | |
| text = re.sub(r'\[REMOVED\](*SKIP)(*FAIL)|\[.*?\]|\{.*?\}', replacer, text) | |
| text = text.replace('⚙️', ' ') | |
| return text | |
| def detect_language(text: str) -> str: | |
| logger.debug("Detecting language.") | |
| try: | |
| return detect(text) | |
| except LangDetectException as e: | |
| logger.warning(f"Language detection failed: {e}") | |
| return 'en' | |
| def handle_scientific_notation(text: str) -> str: | |
| logger.debug("Handling scientific notation.") | |
| return re.sub( | |
| regex_patterns['scientific_notation'][0], | |
| regex_patterns.replace_scientific_notation, | |
| text | |
| ) | |
| def replace_html_entities(text: str) -> str: | |
| logger.debug("Replacing HTML entities.") | |
| return html.unescape(text) | |
| def remove_directional_formatting(text: str) -> str: | |
| """ | |
| Removes all Unicode directional formatting characters from the input text using regex. | |
| Args: | |
| text (str): The input string from which directional formatting characters should be removed. | |
| Returns: | |
| str: The cleaned string with directional formatting characters removed. | |
| """ | |
| # Regex pattern for all directional formatting characters | |
| directional_pattern = r'[\u200E\u200F\u202A\u202B\u202C\u202D\u202E\u2066\u2067\u2068\u2069]' | |
| # Substitute all matches with an empty string | |
| return re.sub(directional_pattern, '', text) | |
| ### | |
| # Main Cleaning Pipeline | |
| ### | |
| def normalize(text: str, config: TextNormalizerConfig) -> str: | |
| logger.debug(f"Original text: {text}") | |
| # Define the normalization steps along with their function references | |
| normalization_steps = [ | |
| ('remove_duplicate_lines', remove_duplicate_lines), | |
| ('normalize_math', normalize_math), | |
| ('remove_html_tags', remove_html_tags), | |
| ('remove_control_characters', remove_control_characters), | |
| ('handle_brackets', _handle_text_in_brackets), | |
| ('replace_unicode_and_bullet_points', replace_unicode_and_bullet_points), | |
| ('remove_special_characters', remove_invalid_characters), | |
| ('handle_emojis', handle_emojis), | |
| ('normalize_unicode', normalize_unicode), | |
| ('normalize_whitespace', normalize_whitespace), | |
| ('expand_contractions', expand_contractions), | |
| ('filter_profanity', filter_profanity), | |
| ] | |
| # Step 1: Capture leading spaces or tabs if strip_lspace is disabled | |
| lines = text.split('\n') | |
| leading_whitespace_map = [] | |
| for line in lines: | |
| leading_whitespace = re.match(r'^[ \t]*', line).group(0) # Capture leading spaces/tabs | |
| leading_whitespace_map.append(leading_whitespace) # Store for later restoration | |
| # Remove non-ASCII characters | |
| def remove_unwanted_unicode(text: str) -> str: | |
| logger.debug("Removing unwanted Unicode characters") | |
| return re.sub(r'[^\x00-\x7F]+', '', text) | |
| # Apply Unicode removal first to ensure all characters are clean | |
| text = remove_unwanted_unicode(text) | |
| # Step 2: Normalize the text (excluding the leading spaces/tabs) | |
| normalized_lines = [] | |
| for i, line in enumerate(lines): | |
| normalized_line = line.rstrip() # Remove trailing whitespace for normalization | |
| # Apply each step of normalization based on config settings | |
| for step_name, step_function in normalization_steps: | |
| if config.settings.get(step_name, False): # Dynamically apply based on config settings | |
| logger.debug(f"Applying {step_name}") | |
| normalized_line = step_function(normalized_line) | |
| normalized_lines.append(normalized_line) | |
| # Step 3: Restore or remove leading spaces/tabs based on strip_lspace config | |
| restored_text = "" | |
| for i, normalized_line in enumerate(normalized_lines): | |
| if config.strip_lspace: | |
| restored_text += normalized_line + '\n' # Do not add leading spaces | |
| else: | |
| restored_text += leading_whitespace_map[i] + normalized_line + '\n' # Restore leading spaces | |
| logger.debug(f"Final normalized text: {restored_text}") | |
| return restored_text.strip() # Ensure no trailing newlines | |
| ## | |
| # Usage | |
| ## | |
| def advanced_usage(): | |
| from verifaix.utils.text_normalizer.main import normalize_text | |
| # text = "Advanced text with 🥳 emojis and bad words like hell." | |
| config = TextNormalizerConfig(settings_type="advanced") | |
| text = ( | |
| "There'd be Sample text 🥳 emojis with <b>HTML</b> and profanity like damn " | |
| "and \\alpha." | |
| ) | |
| text = ( | |
| "This is a 12/23/2005 'Sample' text fuck with can't \n" | |
| "Some HTML like <b>bold</b> and <i>italic</i> with profanity like damn." | |
| ) | |
| text = "● input is tied <b>Ack?!</b> to 0 debug_mode_i" | |
| print(normalize_text(text, config)) | |
| # replace_unicode_and_bullet_points(text) | |
| return | |
| normalized_text = normalize_text(text, config) | |
| print(normalized_text) | |
| if __name__ == "__main__": | |
| # text = "Ain't that a good thing going" | |
| # print(expand_contractions(text)) | |
| advanced_usage() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from verifaix.utils.text_normalizer.cleaning_utils import normalize | |
| from verifaix.utils.text_normalizer.text_normalizer_config import TextNormalizerConfig | |
| from loguru import logger | |
| from verifaix.utils.loguru_setup import setup_logger | |
| setup_logger() | |
| def normalize_text(text: str, config: TextNormalizerConfig) -> str: | |
| logger.debug(f"Normalizing text with settings: {config.settings}") | |
| if isinstance(text, (type(None), list, dict)) or text.strip() == '': | |
| logger.warning("Received None for text, returning an empty string.") | |
| return "" | |
| # Use the cleaning pipeline by calling `normalize` from cleaning_utils.py | |
| normalized_text = normalize(text, config) | |
| return normalized_text | |
| # Example usage functions | |
| def basic_usage(): | |
| text = "Sample text with <b>HTML</b> and profanity like damn." | |
| config = TextNormalizerConfig(settings_type="basic") | |
| normalized_text = normalize_text(text, config) | |
| print("Basic Settings Output:") | |
| print(normalized_text) | |
| def advanced_usage(): | |
| text = "\u202d Advanced text with 🥳 emojis and \u202d bad words like hell and [Things to do {well}]." | |
| text = "'● input is tied to 0 debug_mode_i" | |
| config = TextNormalizerConfig(settings_type="advanced") | |
| normalized_text = normalize_text(text, config) | |
| print("Advanced Settings Output:") | |
| print(normalized_text) | |
| def custom_usage(): | |
| custom_settings = { | |
| "remove_html_tags": True, | |
| "filter_profanity": True, | |
| "remove_special_characters": True, | |
| "normalize_whitespace": True | |
| } | |
| text = ( | |
| "This is a 12/23/2005 'Sample' text fuck with can't \n" | |
| "Some HTML like <b>bold</b> and <i>italic</i> with profanity like damn." | |
| ) | |
| config = TextNormalizerConfig(settings_type="custom", custom_settings=custom_settings) | |
| normalized_text = normalize_text(text, config) | |
| print("Custom Settings Output:") | |
| print(normalized_text) | |
| # Run the example usage | |
| if __name__ == "__main__": | |
| # basic_usage() | |
| # print("-----------------------") | |
| advanced_usage() | |
| # print("-----------------------") | |
| # custom_usage() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from pydantic import BaseModel | |
| from typing import Optional, Dict | |
| class TextNormalizerConfig(BaseModel): | |
| # Properties to control different normalization options | |
| lowercase: bool = False | |
| lemmatization: bool = True | |
| tokenize_urls_emails: bool = False | |
| handle_brackets: bool = False | |
| settings_type: str = "custom" # Can be "basic", "advanced", or "custom" | |
| custom_settings: Optional[Dict[str, bool]] = None | |
| strip_lspace: bool = False # Option to control whether leading spaces are stripped | |
| basic_settings: Dict[str, bool] = { | |
| 'remove_html_tags': True, | |
| 'normalize_whitespace': True, | |
| 'remove_special_characters': False, | |
| } | |
| advanced_settings: Dict[str, bool] = { | |
| # 'remove_html_tags': True, | |
| 'replace_unicode_and_bullet_points': True, | |
| # 'remove_control_characters': True, # removes all non-printable characters (too ) | |
| 'normalize_unicode': True, | |
| 'filter_profanity': True, | |
| 'remove_special_characters': True, | |
| 'normalize_whitespace': True, | |
| 'lowercase': False, | |
| 'lemmatization': True, | |
| 'tokenize_urls_emails': False, | |
| 'handle_brackets': False, | |
| } | |
| @property | |
| def settings(self) -> Dict[str, bool]: | |
| if self.settings_type == "basic": | |
| return self.basic_settings | |
| elif self.settings_type == "advanced": | |
| return self.advanced_settings | |
| elif self.settings_type == "custom" and self.custom_settings: | |
| # Start with basic and update with custom settings | |
| custom_settings = self.basic_settings.copy() | |
| custom_settings.update(self.custom_settings) | |
| return custom_settings | |
| else: | |
| return self.basic_settings |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment