Last active
February 9, 2024 16:26
-
-
Save AlmightyOatmeal/ce0592ca6759d55b73acb59d07dee1af to your computer and use it in GitHub Desktop.
mod_jamie
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Welcome to the tomorrow of yesterday. | |
""" | |
import logging | |
import urllib3 | |
# Disable `InsecureRequestWarning: Unverified HTTPS request is being made to host '...' ...` messages. | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
# ^ Alternatively, one can set the environmental variable: | |
# PYTHONWARNINGS="ignore:Unverified HTTPS request" | |
urllib3.disable_warnings() | |
# | |
# Add trace-level logging. | |
# | |
log_level_trace = logging.DEBUG - 5 | |
logging.addLevelName(log_level_trace, 'TRACE', ) | |
def trace(self, message, *args, **kws): | |
"""Trace-level logging method that will be appended to the `logging.Logger()` class.""" | |
# Yes, logger takes its '*args' as 'args'. | |
self._log(log_level_trace, message, args, **kws) | |
logging.Logger.trace = trace | |
logging = logging.getLogger(__name__) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
logger = logging.getLogger(__name__) | |
def to_csv(data, header=None): | |
"""Quick-n-Dirty CSV blob generator | |
:param data: List of dictionaries. | |
:type data: list | |
:param header: (optional) List of strings of header columns to use in specific order when generating a CSV. These | |
will be appended to the `date` column by default. | |
:type header: list or None | |
:return: Concatenated CSV string blob | |
:rtype: str | |
""" | |
csv_lines = [] | |
headers = [] | |
# Build the header if an override is present otherwise derive one from the data. | |
if header: | |
for h in header: | |
if h in headers: | |
continue | |
headers.append(h) | |
else: | |
# Annoyingly pre-iterate over everything to ensure we have ALL columns from ALL dictionaries in the array. | |
for line in data: | |
for key in line.keys(): | |
if key in headers: | |
continue | |
headers.append(key) | |
# Add the header line to the CSV | |
csv_lines.append('"{}"'.format('","'.join(headers))) | |
# Add actual data to the CSV | |
for line in data: | |
# `line.get(i, '')` | |
# If no value is present then use an empty string as a placeholder rather than a string of "None" which | |
# can be confusing and cause excess file bloat. | |
# `.replace('\"', '\'')` | |
# Convert value-based quotes as to not conflict with CSV-level quotes to ensure it can be parsed properly. | |
csv_lines.append('"{}"'.format('","'.join([str(line.get(i, '')).replace('\"', '\'') for i in headers]))) | |
# Concatenate the resulting array separating by newlines | |
return '\n'.join(csv_lines) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
logger = logging.getLogger(__name__) | |
def list_slicer(data, size): | |
"""List slicer | |
:param data: List of values to be sliced. | |
:type data: list | |
:param size: How many items each list should contain. | |
:type size: int | |
:return: Generator providing a list of items at, or less, than the specified size. | |
:rtype: list | |
""" | |
for i in range(0, len(data), size): | |
yield data[i:i + size] | |
def list_dedupe(data): | |
"""Deduplicate a list using hashtables. | |
:param data: List to be deduplicated. | |
:type data: list | |
:return: List free of duplicates. | |
:rtype: list | |
""" | |
return list(dict.fromkeys(data)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
logger = logging.getLogger(__name__) | |
def round_decimal(value, decimal_places): | |
"""Rounds decimal values to given number of decimal places. | |
:param value: Numeric value to round. | |
:type value: float or int | |
:param decimal_places: Number of decimal places to round to. | |
:type decimal_places: int | |
:return: Rounded value. | |
:rtype: float | |
""" | |
# String formatting is actually a little faster than using `round()`, that's kinda neat! | |
return float(f'{value:.{decimal_places}f}') | |
# return round(float(value), decimal_places) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import decimal | |
import json | |
import logging | |
import re | |
logger = logging.getLogger(__name__) | |
class CustomJSONEncoder(json.JSONEncoder): | |
"""Custom JSON encoder that does things that shouldn't need to be done.""" | |
def default(self, obj): | |
"""Overrides the default serialization of JSONEncoder then calls the JSONEncoder default() method. | |
:param obj: Object to serialize. | |
:type obj: object | |
:return: json.JSONEncoder.default() object. | |
:rtype: instance | |
""" | |
try: | |
if isinstance(obj, (datetime.datetime, datetime.time, datetime.date)): | |
return obj.isoformat() | |
if isinstance(obj, decimal.Decimal): | |
s = str(obj) | |
if '.' in s: | |
return float(s) | |
else: | |
return int(s) | |
iterable = iter(obj) | |
except TypeError: | |
pass | |
else: | |
return list(iterable) | |
return json.JSONEncoder.default(self, obj) | |
def json_string_hook(obj): | |
"""JSON deserializer helper to ensure values are converted to strings instead of native datatypes due | |
to data inconsistencies. | |
Current behavior: | |
- Convert all non-iterable values to strings. | |
- Exclude values where the key contains the word 'date'. | |
:param obj: json.loads() dict | |
:type obj: dict | |
:return: Updated dictionary | |
:rtype: dict | |
""" | |
obj_d = dict(obj) | |
# return {k: str(v) if isinstance(v, bool) else v for k, v in obj_d.items()} | |
return {k: str(v) if 'date' not in str(k).lower() and not hasattr(v, '__iter__') else v for k, v in obj_d.items()} | |
def json_pretty(data, encoder=CustomJSONEncoder): | |
"""Converts Python dict or list/set/array objects to a pretty-printed JSON string. | |
:param data: Python iter object like dict, list, set, array, tuple, etc. | |
:type data: dict, list, set, array, tuple | |
:param encoder: (optional) Custom JSON encoder class that's an extension of `json.JSONEncoder`. | |
(default: CustomJSONEncoder) | |
:type encoder: json.JSONEncoder | |
:return: Pretty-printed JSON string. | |
:rtype: str | |
""" | |
return json.dumps(data, sort_keys=True, indent=4, separators=(',', ': '), ensure_ascii=True, cls=encoder) | |
def json_min(data, encoder=CustomJSONEncoder): | |
"""Converts Python dict or list/set/array objects to a minified JSON string. | |
:param data: Python iter object like dict, list, set, array, tuple, etc. | |
:type data: dict, list, set, array, tuple | |
:param encoder: (optional) Custom JSON encoder class that's an extension of `json.JSONEncoder`. | |
(default: CustomJSONEncoder) | |
:type encoder: json.JSONEncoder | |
:return: Minified JSON string. | |
:rtype: str | |
""" | |
return json.dumps(data, separators=(',', ":"), cls=encoder) | |
def fix_broken_json(input_str): | |
"""Fix broken JSON quotes... | |
:param input_str: Broken JSON string. | |
:type input_str: str | |
:return: Fixed JSON string | |
:rtype: str | |
""" | |
# <MOVED INSIDE FUNCTION> | |
# Move this stuff out of the function because it doesn't need to be assigned and compiled every. single. time. | |
# It's just here for the sake of organization and keeping things somewhat organized. | |
# Use sets `()` instead of lists `[]` because of set hashtables which improves performance over lists. Set's need | |
# to have more than one value so some may have a comma without an additional value just to appease the Python gods. | |
expected_chars = { | |
"[": (",", "]"), | |
"]": ("[", ","), | |
"{": (":",), | |
"}": (",", "{", "]"), | |
":": (",", "}"), | |
",": (":", "{", "}", "[", "]"), | |
} | |
double_quote = '"' | |
# Backslash needs to be escaped otherwise Python thinks it's escaping the single quote. | |
backslash = '\\' | |
# Precompile the regular expression (which is why it's better outside of this function) | |
regex_nonwhite = re.compile(r'\S') | |
# </MOVED INSIDE FUNCTION> | |
output_str = '' | |
in_string = False | |
prev = None | |
prev_nonwhite_nonquote = None | |
# Iterate over string, letter by letter, with character position. | |
for char_pos, char in enumerate(input_str): | |
if char is double_quote and prev is not backslash: | |
if in_string: | |
# If we're already inside a quoted string and if the next non-whitespace character is an expected one, | |
# then we have exited the quoted string. Otherwise, escape the quote. | |
nonwhite_char = regex_nonwhite.search(input_str, pos=char_pos+1).group() | |
if nonwhite_char in expected_chars.get(prev_nonwhite_nonquote, ''): | |
in_string = False | |
else: | |
output_str += backslash | |
else: | |
in_string = True | |
elif not in_string and char.strip(): | |
# Previous non-whitespace non-quote character. | |
prev_nonwhite_nonquote = char | |
# Add character to the output string. | |
output_str += char | |
prev = char | |
return output_str | |
def sub_json_parser(obj): | |
"""Try to parse JSON values from a dictionary or list of dictionaries. | |
NOTE: This does not recursively go through and try to parse; this is currently setup to only do the root level | |
for things such as JSON stored in a database table. | |
:param obj: Dictionary or list of dictionaries. | |
:type obj: dict or list | |
:return: dict or list | |
:rtype: dict or list | |
""" | |
# If it's a list of dictionaries then iterate and pass each dict into this function | |
if isinstance(obj, (list, set, tuple)): | |
# Create a new list of results because you can't manipulate a list you're iterating over. | |
new_obj = [] | |
for i in obj: | |
# E.T. phone home... | |
new_obj.append( | |
sub_json_parser(i) | |
) | |
return new_obj | |
# If it's a dict then iterate over the keys and values | |
elif isinstance(obj, dict): | |
# Create a new dictionary object because you can't edit dicts or lists while iterating over them without | |
# causing state inconsistencies. | |
new_obj = {} | |
for k, v in obj.items(): | |
# If the value is not already a string then keep the original value and move on. | |
if not isinstance(v, str): | |
new_obj[k] = v | |
continue | |
# If there is a curly brace in there then assume it might be JSON. | |
elif '{' in v: | |
# Try to parse the JSON as-is | |
try: | |
new_obj[k] = json.loads(v) | |
except Exception as err: | |
# Since the JSON might be broken then try to run it through the `fix_broken_json()` function | |
try: | |
new_obj[k] = json.loads(fix_broken_json(v)) | |
except Exception as errrrrrrr: | |
logger.debug(f'Unable to fix broken json key={k}, value={v}') | |
# If the JSON can't be fixed then keep the original value and move on. | |
new_obj[k] = v | |
else: | |
# Catch-all | |
new_obj[k] = v | |
return new_obj | |
# If it's not a list, set, tuple, or dict, then return the object untouched. | |
else: | |
return obj |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import re | |
from lxml import etree | |
logger = logging.getLogger(__name__) | |
def lxml_to_dict_with_arrays(element): | |
"""Converts a lxml.objectify.ObjectifiedElement object to dictionary for JSON conversion. | |
:param element: LXML objectified element. | |
:type element: lxml.objectify.ObjectifiedElement | |
:return: Dictionary | |
:rtype: dict | |
""" | |
# <CAN BE MOVED OUT OF FUNCTION> | |
# For the sake of convenience it's within the function but can be moved out so the regex isn't compiled repeatedly. | |
regex_namespace = re.compile('{.*}') | |
# </CAN BE MOVED OUT OF FUNCTION> | |
# Make sure that we are iterating over the root of a document so we can get the root's children; maybe try .iter() instead? | |
if hasattr(element, 'getroot'): | |
element = element.getroot() | |
result = {} | |
if not element.getchildren(): | |
tag = regex_namespace.sub('', element.tag) | |
result[tag] = element.text | |
else: | |
for elem in element.getchildren(): | |
sub_dict = lxml_to_dict_with_arrays(elem) | |
tag = regex_namespace.sub('', element.tag) | |
sub_tag = regex_namespace.sub('', elem.tag) | |
if result.get(tag): | |
# If the same child tag appears more than once, convert it to a list. | |
if sub_tag in result[tag]: | |
# No need to redundantly have the sub element tag name in there since it's already at the root | |
# of the array | |
if sub_tag in sub_dict: | |
sub_dict = sub_dict[sub_tag] | |
# Append the child dictionary to the existing array or convert the existing structure to a list | |
# containing the old dictionary AND the new child dictionary. | |
if isinstance(result[tag][sub_tag], list): | |
result[tag][sub_tag].append(sub_dict) | |
else: | |
result[tag][sub_tag] = [result[tag][sub_tag], sub_dict] | |
else: | |
# If no child already exists, append the child dictionary to an existing list or update the | |
# existing dictionary accordingly. | |
if isinstance(result[tag], list): | |
result[tag].append(sub_dict) | |
else: | |
result[tag].update(sub_dict) | |
else: | |
result[tag] = sub_dict | |
return result | |
def xml_pretty(data, indent_level=4): | |
"""Converts LXML object to a pretty-printed XML string. | |
:param data: Object to be prettified. | |
:type data: lxml.objectify.ObjectifiedElement | |
:param indent_level: (optional) How many spaces should elements be indented (default: 4) | |
:type indent_level: int | |
:return: Pretty-printed string, oooooh-aaaaahh. | |
:rtype: str | |
""" | |
etree.indent(data, space=' '*indent_level) | |
return etree.tostring(data, pretty_print=True, method='xml', xml_declaration=True, encoding='utf-8').decode('utf-8') | |
def xml_minify(data): | |
"""Converts LXML object to a minified XML string. | |
:param data: Object to be minified. | |
:type data: lxml.objectify.ObjectifiedElement | |
:return: Minified string. | |
:rtype: str | |
""" | |
return etree.tostring(data, pretty_print=False, method='xml', xml_declaration=True, encoding='utf-8').decode('utf-8') | |
def xml_remove_namespaces(data): | |
"""Remove namespaces from elements. | |
:param data: Parsed document to be cleaned. | |
:type data: lxml.objectify.ObjectifiedElement | |
:return: Cleaned document. | |
:rtype: lxml.objectify.ObjectifiedElement | |
""" | |
for elem in data.getiterator(): | |
# Skip comments and processing instructions because they do not have names | |
if isinstance(elem, (etree._Comment, etree._ProcessingInstruction)): | |
continue | |
# Remove a namespace URI in the element's name | |
elem.tag = etree.QName(elem).localname | |
# Remove namespaces from attribute's names too | |
for attr_name in elem.attrib: | |
local_attr_name = etree.QName(attr_name).localname | |
if attr_name != local_attr_name: | |
attr_value = elem.attrib[attr_name] | |
del elem.attrib[attr_name] | |
elem.attrib[local_attr_name] = attr_value | |
# Finally cleanup unused namespaces | |
etree.cleanup_namespaces(data) | |
return data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment