Last active
July 15, 2024 12:58
-
-
Save ap-Codkelden/e12084cc1edd11c81465359b21622273 to your computer and use it in GitHub Desktop.
Useful tools for text processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# 2024-07-15 | |
import math | |
import re | |
from datetime import datetime | |
from hashlib import md5 | |
from os import scandir | |
from typing import Optional, List, Iterable, Union | |
import numpy as np | |
import pandas as pd | |
import requests | |
def chunks(lst, n): | |
"""Yield successive n-sized chunks from lst.""" | |
for i in range(0, len(lst), n): | |
yield lst[i:i + n] | |
def normalize_np(np_string: str) -> str: | |
return re.sub(r"^([мс]\.)([А-ЯЄЇІҐ])", r"\1 \2", np_string) | |
def calculate_levenshtein_distance(string1: str, string2: str) -> int: | |
""" | |
Calculates the Levenshtein distance between two strings. | |
Args: | |
string1 (str): The first string to compare. | |
string2 (str): The second string to compare. | |
Returns: | |
int: The minimum number of operations (insertions, deletions, | |
or substitutions) needed to transform string1 into string2. | |
""" | |
if len(string1) < len(string2): | |
return calculate_levenshtein_distance(string2, string1) | |
if len(string2) == 0: | |
return len(string1) | |
# Initialize the previous row to be a range of numbers from 0 to the length of string2. | |
previous_row = range(len(string2) + 1) | |
for i, char1 in enumerate(string1): | |
# Initialize the current row with the first element being i+1. | |
current_row = [i + 1] | |
for j, char2 in enumerate(string2): | |
insertions = previous_row[j + 1] + 1 | |
deletions = current_row[j] + 1 | |
substitutions = previous_row[j] + (char1 != char2) | |
current_row.append(min(insertions, deletions, substitutions)) | |
# Set the previous row to the current row for the next iteration of the loop. | |
previous_row = current_row | |
# Return the last element of the previous row. | |
return previous_row[-1] | |
def csv_compatible(string: str, typostrofe=False, translit=False, uk2en=False) -> Optional[str]: | |
# re.sub(pattern, repl, string, count=0, flags=0) | |
if pd.isnull(string): | |
return | |
en_letters = "aeiopcxBKMHTyAEIOPCX" | |
uk_letters = "аеіорсхВКМНТуАЕІОРСХ" | |
if not uk2en: | |
order = en_letters, uk_letters | |
else: | |
order = uk_letters, en_letters | |
transpose = str.maketrans(*order) | |
k = string.replace('\xa0', ' ') \ | |
.replace('"', '”') \ | |
.replace('`', "'") | |
if typostrofe: | |
k = k.replace("'", '’') | |
k = re.sub(" ?", "", k) | |
k = re.sub(r"\s+", " ", k) | |
if translit: | |
k = k.translate(transpose) | |
return k.strip() | |
def store_csv(df, csv_name: str = None, index=None, | |
index_label=None) -> None: | |
""" | |
If `index` is not set, it means to True. | |
:param index_label: Name of index label | |
:type index_label: str | |
:param df: Dataframe to export as CSV | |
:param csv_name: Name of resulting CSV file | |
:param index: store index in CSV file | |
:return: None | |
""" | |
# csv.QUOTE_NONNUMERIC is 2 | |
csv_name = csv_name.strip() | |
if index is None: | |
index = True | |
df.to_csv(csv_name, sep='\t', na_rep='', float_format=None, columns=None, | |
header=True, index=index, index_label=index_label, mode='w', | |
encoding=None, compression='infer', quoting=2, | |
quotechar='"', line_terminator='\n', chunksize=None, | |
date_format=None, doublequote=True, escapechar=None, decimal='.') | |
def row_join(seq: Iterable): | |
new_seq = set([x for x in seq if pd.notnull(x)]) | |
if not new_seq: | |
return np.nan | |
else: | |
return "; ".join(new_seq) | |
def get_cvk_page(url: str, headers: dict) -> Optional[str]: | |
"""Повертає текст сторінки або нічого | |
""" | |
s = requests.Session | |
res = s.get(url, headers=headers, verify=False) | |
# print(res.encoding) | |
res.encoding = "windows-1251" | |
if res.status_code != 200: | |
print(f"Error => {url}") | |
return | |
return res.text | |
def get_tag_value(cont: List) -> Optional[str]: | |
if not cont: | |
return | |
_ = [csv_compatible(x) for x in cont] | |
return _[0] | |
def raj_split(text: str) -> pd.Series: | |
if ',' not in text: | |
add = [text, None] | |
else: | |
add = [x.strip() for x in text.split(",", 1)] | |
if len(add) > 2: | |
print(add) | |
raise ValueError | |
return pd.Series(add) | |
def date2iso(data_val: str): | |
if pd.isnull(data_val) or not data_val: | |
return None | |
return datetime.strptime( | |
data_val, "%d.%m.%Y").date().isoformat() | |
def split_pib(full_pib: str) -> pd.Series: | |
k: List[Union[str, None]] = [x.strip() for x in full_pib.split(" ")] | |
if len(k) < 3: | |
while len(k) < 3: | |
k.append(None) | |
else: | |
k = k[:2] + [" ".join(k[2:])] | |
return pd.Series(k, index=["surnamey", 'firstnameq', | |
"middlenamej"]) | |
def sex(patronym: Optional[str]) -> Optional[str]: | |
if pd.isnull(patronym): | |
return | |
if re.search("([іїо]вна|кизи)$", patronym, re.I) is not None: | |
return "_02" | |
elif re.search("(й?[ео]в[іи]ч|[іи]ч|огли)$", patronym, re.I) is not None: | |
return "_01" | |
else: | |
return patronym | |
def create_index_string(*args): | |
args_list = [ | |
re.sub( | |
r"\s", "", csv_compatible(x.upper()) | |
) for x in args if pd.notnull(x)] | |
return "".join(args_list) | |
def create_hash(*args): | |
k = "".join([csv_compatible(x).replace(" ", "") for x in args]) | |
return md5(k.encode('utf-8')).hexdigest() | |
def scantree(path="."): | |
"""Recursively yield DirEntry objects for given directory.""" | |
for entry in scandir(path): | |
if entry.is_dir(follow_symlinks=False): | |
yield from scantree(entry.path) | |
else: | |
yield entry | |
def col_diff(df_a, df_b, a_name="df_A", b_name="df_B"): | |
col_a, col_b = df_a.columns, df_b.columns | |
union = set(list(df_a.columns) + list(df_b.columns)) | |
max_len = max([len(x) for x in union]) | |
print(" | ".join([a_name.center(max_len), b_name.center(max_len)])) | |
print("+".join(['-' * (max_len + 1), '-' * (max_len + 1)])) | |
for i in union: | |
a = i if (i in col_a) else '' | |
b = i if i in col_b else '' | |
print(" | ".join([a.rjust(max_len), b])) | |
to_a = [x for x in col_b if x not in col_a] | |
to_b = [x for x in col_a if x not in col_b] | |
print(f"To {a_name}: ", to_a) | |
print(f"To {b_name}: ", to_b) | |
def change_width(ax, new_value, vertical=True): | |
# Changes bar width at Seaborn Bar Chart | |
for patch in ax.patches: | |
if vertical: | |
current_width = patch.get_width() | |
else: | |
current_width = patch.get_height() | |
diff = current_width - new_value | |
if vertical: | |
patch.set_width(new_value) | |
patch.set_x(patch.get_x() + diff * .5) | |
else: | |
patch.set_height(new_value) | |
patch.set_y(patch.get_y() + diff * .5) | |
def number_format(n: int) -> str: | |
# Converts 10 -> 10, 1000 -> 1K | |
divider: int = 1 | |
suff: str = "" | |
if n < 1000: | |
return str(int(n)) | |
pwr = math.log10(n) | |
match pwr: | |
case _ if pwr < 3: | |
suff = "" | |
divider = 1 | |
case _ if pwr < 6: | |
suff = "K" | |
divider = 10 ** 3 | |
case _ if pwr < 9: | |
suff = 'M' | |
divider = 10 ** 6 | |
return f"{int(n / divider)}{suff}" | |
def hex_to_rgb(h): | |
h = h.replace("#", "") | |
return tuple(int(h[i:i + 2], 16) for i in (0, 2, 4)) | |
def rgb_to_hex(rgb): | |
return '#%02x%02x%02x' % rgb | |
def check_edrpou_sum(edrpou: str) -> bool: | |
""" | |
:param str edrpou: a USEROU code variable | |
:return: True or False depends on ``edrpou`` validity | |
""" | |
def calculate() -> int: | |
s: int = 0 | |
for x, y in zip([int(x) for x in edrpou], wages): | |
s += x * y | |
return s % 11 | |
wages = 1, 2, 3, 4, 5, 6, 7 | |
if 30_000_000 < int(edrpou) < 60_000_000: | |
wages = 7, 1, 2, 3, 4, 5, 6 | |
k = calculate() | |
if k < 10: | |
return k == int(edrpou[-1]) | |
wages = (x+2 for x in wages) | |
k = calculate() | |
if k == 10: | |
k = 0 | |
return k == int(edrpou[-1]) | |
def contrast_color(color): | |
r, g, b = (round(x*255,0) for x in color[:3]) | |
luminance = 1 - (.299 * r + .587 * g + .114 * b) / 255 | |
d = 0 if luminance < .5 else 255 | |
return d, d, d | |
def minutes(s: str) -> Optional[int]: | |
"""converting hours, minutes, and seconds to Minutes | |
""" | |
multiplier = [3600, 60, 1] | |
m = re.match("^(?P<h>\d+h )?(?P<m>\d+m )?(?P<s>\d+s)$", s) | |
# Extract hours, minutes, and seconds from the matched groups | |
q = [0 if g is None else int(re.sub("\D", "", g)) for g in m.groups()] | |
return sum([x * y for x, y in zip(multiplier, q)]) // 60 | |
def standardize_phone_number(phone): | |
# Define the pattern to extract phone numbers | |
pattern = r'(?:\+?38)?(?:\s*\(?0?(\d{2})\)?[\s.-]?)?(\d{3})[\s.-]?(\d{2,3})[\s.-]?(\d{2,3})' | |
# Convert phone number to string if it's not already | |
phone = str(phone) | |
# Remove everything except numbers, hyphens, and parentheses | |
phone = re.sub(r'[^\d()-]', '', phone) | |
# Remove parentheses | |
phone = phone.replace('(', '').replace(')', '') | |
# Find all matches of the pattern in the phone number | |
matches = re.findall(pattern, phone) | |
if matches: | |
# Extract carrier code and phone number from the first match | |
carrier_code, part1, part2, part3 = matches[0] | |
if carrier_code: | |
# Return the unified format with carrier code and phone number | |
return f'380{carrier_code}{part1}{part2}{part3}' | |
else: | |
# Return None if carrier code is not found | |
return None | |
else: | |
# Return "--" + value if the value is definitely not a phone number | |
return re.sub(r'\D', '', phone) | |
def cyrillic_cleaner(*args): | |
range = r'[^\u0400-\u04FF]' | |
return re.sub(range, "", "".join(*args), re.I).upper() | |
def get_fiscal_quarter(year_month, short_year=True, reverse=False): | |
date = datetime.strptime(year_month, "%Y-%m") | |
fiscal_year = date.year | |
month = date.month | |
match month: | |
case 10 | 11 | 12: | |
fiscal_year = fiscal_year + 1 | |
quarter = 1 | |
case _: | |
quarter = ceil(month / 3) + 1 | |
if short_year: | |
fiscal_year = fiscal_year % 100 | |
q_part, y_part = f"Q{quarter}", f"FY{fiscal_year:02d}" | |
return "".join([q_part, y_part]) if reverse else "".join([y_part, q_part]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment