Skip to content

Instantly share code, notes, and snippets.

@ap-Codkelden
Last active July 15, 2024 12:58
Show Gist options
  • Save ap-Codkelden/e12084cc1edd11c81465359b21622273 to your computer and use it in GitHub Desktop.
Save ap-Codkelden/e12084cc1edd11c81465359b21622273 to your computer and use it in GitHub Desktop.
Useful tools for text processing
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 2024-07-15
import math
import re
from datetime import datetime
from hashlib import md5
from os import scandir
from typing import Optional, List, Iterable, Union
import numpy as np
import pandas as pd
import requests
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
def normalize_np(np_string: str) -> str:
return re.sub(r"^([мс]\.)([А-ЯЄЇІҐ])", r"\1 \2", np_string)
def calculate_levenshtein_distance(string1: str, string2: str) -> int:
"""
Calculates the Levenshtein distance between two strings.
Args:
string1 (str): The first string to compare.
string2 (str): The second string to compare.
Returns:
int: The minimum number of operations (insertions, deletions,
or substitutions) needed to transform string1 into string2.
"""
if len(string1) < len(string2):
return calculate_levenshtein_distance(string2, string1)
if len(string2) == 0:
return len(string1)
# Initialize the previous row to be a range of numbers from 0 to the length of string2.
previous_row = range(len(string2) + 1)
for i, char1 in enumerate(string1):
# Initialize the current row with the first element being i+1.
current_row = [i + 1]
for j, char2 in enumerate(string2):
insertions = previous_row[j + 1] + 1
deletions = current_row[j] + 1
substitutions = previous_row[j] + (char1 != char2)
current_row.append(min(insertions, deletions, substitutions))
# Set the previous row to the current row for the next iteration of the loop.
previous_row = current_row
# Return the last element of the previous row.
return previous_row[-1]
def csv_compatible(string: str, typostrofe=False, translit=False, uk2en=False) -> Optional[str]:
# re.sub(pattern, repl, string, count=0, flags=0)
if pd.isnull(string):
return
en_letters = "aeiopcxBKMHTyAEIOPCX"
uk_letters = "аеіорсхВКМНТуАЕІОРСХ"
if not uk2en:
order = en_letters, uk_letters
else:
order = uk_letters, en_letters
transpose = str.maketrans(*order)
k = string.replace('\xa0', ' ') \
.replace('"', '”') \
.replace('`', "'")
if typostrofe:
k = k.replace("'", '’')
k = re.sub("&nbsp;?", "", k)
k = re.sub(r"\s+", " ", k)
if translit:
k = k.translate(transpose)
return k.strip()
def store_csv(df, csv_name: str = None, index=None,
index_label=None) -> None:
"""
If `index` is not set, it means to True.
:param index_label: Name of index label
:type index_label: str
:param df: Dataframe to export as CSV
:param csv_name: Name of resulting CSV file
:param index: store index in CSV file
:return: None
"""
# csv.QUOTE_NONNUMERIC is 2
csv_name = csv_name.strip()
if index is None:
index = True
df.to_csv(csv_name, sep='\t', na_rep='', float_format=None, columns=None,
header=True, index=index, index_label=index_label, mode='w',
encoding=None, compression='infer', quoting=2,
quotechar='"', line_terminator='\n', chunksize=None,
date_format=None, doublequote=True, escapechar=None, decimal='.')
def row_join(seq: Iterable):
new_seq = set([x for x in seq if pd.notnull(x)])
if not new_seq:
return np.nan
else:
return "; ".join(new_seq)
def get_cvk_page(url: str, headers: dict) -> Optional[str]:
"""Повертає текст сторінки або нічого
"""
s = requests.Session
res = s.get(url, headers=headers, verify=False)
# print(res.encoding)
res.encoding = "windows-1251"
if res.status_code != 200:
print(f"Error => {url}")
return
return res.text
def get_tag_value(cont: List) -> Optional[str]:
if not cont:
return
_ = [csv_compatible(x) for x in cont]
return _[0]
def raj_split(text: str) -> pd.Series:
if ',' not in text:
add = [text, None]
else:
add = [x.strip() for x in text.split(",", 1)]
if len(add) > 2:
print(add)
raise ValueError
return pd.Series(add)
def date2iso(data_val: str):
if pd.isnull(data_val) or not data_val:
return None
return datetime.strptime(
data_val, "%d.%m.%Y").date().isoformat()
def split_pib(full_pib: str) -> pd.Series:
k: List[Union[str, None]] = [x.strip() for x in full_pib.split(" ")]
if len(k) < 3:
while len(k) < 3:
k.append(None)
else:
k = k[:2] + [" ".join(k[2:])]
return pd.Series(k, index=["surnamey", 'firstnameq',
"middlenamej"])
def sex(patronym: Optional[str]) -> Optional[str]:
if pd.isnull(patronym):
return
if re.search("([іїо]вна|кизи)$", patronym, re.I) is not None:
return "_02"
elif re.search("(й?[ео]в[іи]ч|[іи]ч|огли)$", patronym, re.I) is not None:
return "_01"
else:
return patronym
def create_index_string(*args):
args_list = [
re.sub(
r"\s", "", csv_compatible(x.upper())
) for x in args if pd.notnull(x)]
return "".join(args_list)
def create_hash(*args):
k = "".join([csv_compatible(x).replace(" ", "") for x in args])
return md5(k.encode('utf-8')).hexdigest()
def scantree(path="."):
"""Recursively yield DirEntry objects for given directory."""
for entry in scandir(path):
if entry.is_dir(follow_symlinks=False):
yield from scantree(entry.path)
else:
yield entry
def col_diff(df_a, df_b, a_name="df_A", b_name="df_B"):
col_a, col_b = df_a.columns, df_b.columns
union = set(list(df_a.columns) + list(df_b.columns))
max_len = max([len(x) for x in union])
print(" | ".join([a_name.center(max_len), b_name.center(max_len)]))
print("+".join(['-' * (max_len + 1), '-' * (max_len + 1)]))
for i in union:
a = i if (i in col_a) else ''
b = i if i in col_b else ''
print(" | ".join([a.rjust(max_len), b]))
to_a = [x for x in col_b if x not in col_a]
to_b = [x for x in col_a if x not in col_b]
print(f"To {a_name}: ", to_a)
print(f"To {b_name}: ", to_b)
def change_width(ax, new_value, vertical=True):
# Changes bar width at Seaborn Bar Chart
for patch in ax.patches:
if vertical:
current_width = patch.get_width()
else:
current_width = patch.get_height()
diff = current_width - new_value
if vertical:
patch.set_width(new_value)
patch.set_x(patch.get_x() + diff * .5)
else:
patch.set_height(new_value)
patch.set_y(patch.get_y() + diff * .5)
def number_format(n: int) -> str:
# Converts 10 -> 10, 1000 -> 1K
divider: int = 1
suff: str = ""
if n < 1000:
return str(int(n))
pwr = math.log10(n)
match pwr:
case _ if pwr < 3:
suff = ""
divider = 1
case _ if pwr < 6:
suff = "K"
divider = 10 ** 3
case _ if pwr < 9:
suff = 'M'
divider = 10 ** 6
return f"{int(n / divider)}{suff}"
def hex_to_rgb(h):
h = h.replace("#", "")
return tuple(int(h[i:i + 2], 16) for i in (0, 2, 4))
def rgb_to_hex(rgb):
return '#%02x%02x%02x' % rgb
def check_edrpou_sum(edrpou: str) -> bool:
"""
:param str edrpou: a USEROU code variable
:return: True or False depends on ``edrpou`` validity
"""
def calculate() -> int:
s: int = 0
for x, y in zip([int(x) for x in edrpou], wages):
s += x * y
return s % 11
wages = 1, 2, 3, 4, 5, 6, 7
if 30_000_000 < int(edrpou) < 60_000_000:
wages = 7, 1, 2, 3, 4, 5, 6
k = calculate()
if k < 10:
return k == int(edrpou[-1])
wages = (x+2 for x in wages)
k = calculate()
if k == 10:
k = 0
return k == int(edrpou[-1])
def contrast_color(color):
r, g, b = (round(x*255,0) for x in color[:3])
luminance = 1 - (.299 * r + .587 * g + .114 * b) / 255
d = 0 if luminance < .5 else 255
return d, d, d
def minutes(s: str) -> Optional[int]:
"""converting hours, minutes, and seconds to Minutes
"""
multiplier = [3600, 60, 1]
m = re.match("^(?P<h>\d+h )?(?P<m>\d+m )?(?P<s>\d+s)$", s)
# Extract hours, minutes, and seconds from the matched groups
q = [0 if g is None else int(re.sub("\D", "", g)) for g in m.groups()]
return sum([x * y for x, y in zip(multiplier, q)]) // 60
def standardize_phone_number(phone):
# Define the pattern to extract phone numbers
pattern = r'(?:\+?38)?(?:\s*\(?0?(\d{2})\)?[\s.-]?)?(\d{3})[\s.-]?(\d{2,3})[\s.-]?(\d{2,3})'
# Convert phone number to string if it's not already
phone = str(phone)
# Remove everything except numbers, hyphens, and parentheses
phone = re.sub(r'[^\d()-]', '', phone)
# Remove parentheses
phone = phone.replace('(', '').replace(')', '')
# Find all matches of the pattern in the phone number
matches = re.findall(pattern, phone)
if matches:
# Extract carrier code and phone number from the first match
carrier_code, part1, part2, part3 = matches[0]
if carrier_code:
# Return the unified format with carrier code and phone number
return f'380{carrier_code}{part1}{part2}{part3}'
else:
# Return None if carrier code is not found
return None
else:
# Return "--" + value if the value is definitely not a phone number
return re.sub(r'\D', '', phone)
def cyrillic_cleaner(*args):
range = r'[^\u0400-\u04FF]'
return re.sub(range, "", "".join(*args), re.I).upper()
def get_fiscal_quarter(year_month, short_year=True, reverse=False):
date = datetime.strptime(year_month, "%Y-%m")
fiscal_year = date.year
month = date.month
match month:
case 10 | 11 | 12:
fiscal_year = fiscal_year + 1
quarter = 1
case _:
quarter = ceil(month / 3) + 1
if short_year:
fiscal_year = fiscal_year % 100
q_part, y_part = f"Q{quarter}", f"FY{fiscal_year:02d}"
return "".join([q_part, y_part]) if reverse else "".join([y_part, q_part])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment