Last active
August 12, 2024 07:19
-
-
Save elazarl/bc8007dfeb44118f1584744e835ec947 to your computer and use it in GitHub Desktop.
just the lsub portion of jc from https://github.com/kellyjonbrazil/jc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# from utils.py | |
"""jc - JSON Convert utils""" | |
import sys | |
import re | |
import locale | |
import shutil | |
from itertools import islice | |
from collections import namedtuple | |
from numbers import Number | |
from datetime import datetime, timezone | |
from textwrap import TextWrapper | |
from functools import lru_cache | |
from typing import Any, List, Dict, Iterable, Union, Optional, TextIO | |
TimeStampFormatType = Dict | |
CLI_QUIET = False | |
def _asciify(string: str) -> str: | |
""" | |
Return a string downgraded from Unicode to ASCII with some simple | |
conversions. | |
""" | |
string = string.replace("©", "(c)") | |
# the ascii() function adds single quotes around the string | |
string = ascii(string)[1:-1] | |
string = string.replace(r"\n", "\n") | |
return string | |
def _safe_print( | |
string: str, | |
sep: str = " ", | |
end: str = "\n", | |
file: TextIO = sys.stdout, | |
flush: bool = False, | |
) -> None: | |
"""Output for both UTF-8 and ASCII encoding systems""" | |
try: | |
print(string, sep=sep, end=end, file=file, flush=flush) | |
except UnicodeEncodeError: | |
print(_asciify(string), sep=sep, end=end, file=file, flush=flush) | |
def _safe_pager(string: str) -> None: | |
"""Pager output for both UTF-8 and ASCII encoding systems""" | |
from pydoc import pager | |
try: | |
pager(string) | |
except UnicodeEncodeError: | |
pager(_asciify(string)) | |
def warning_message(message_lines: List[str]) -> None: | |
""" | |
Prints warning message to `STDERR` for non-fatal issues. The first line | |
is prepended with 'jc: Warning - ' and subsequent lines are indented. | |
Wraps text as needed based on the terminal width. | |
Parameters: | |
message: (list) list of string lines | |
Returns: | |
None - just prints output to STDERR | |
""" | |
if CLI_QUIET: | |
return | |
# this is for backwards compatibility with existing custom parsers | |
if isinstance(message_lines, str): | |
message_lines = [message_lines] | |
columns = shutil.get_terminal_size().columns | |
first_wrapper = TextWrapper(width=columns, subsequent_indent=" " * 15) | |
next_wrapper = TextWrapper( | |
width=columns, initial_indent=" " * 15, subsequent_indent=" " * 19 | |
) | |
first_line = message_lines.pop(0) | |
first_str = f"jc: Warning - {first_line}" | |
first_str = first_wrapper.fill(first_str) | |
_safe_print(first_str, file=sys.stderr) | |
for line in message_lines: | |
if line == "": | |
continue | |
message = next_wrapper.fill(line) | |
_safe_print(message, file=sys.stderr) | |
def error_message(message_lines: List[str]) -> None: | |
""" | |
Prints an error message to `STDERR` for fatal issues. The first line is | |
prepended with 'jc: Error - ' and subsequent lines are indented. | |
Wraps text as needed based on the terminal width. | |
Parameters: | |
message: (list) list of string lines | |
Returns: | |
None - just prints output to STDERR | |
""" | |
columns = shutil.get_terminal_size().columns | |
first_wrapper = TextWrapper(width=columns, subsequent_indent=" " * 13) | |
next_wrapper = TextWrapper( | |
width=columns, initial_indent=" " * 13, subsequent_indent=" " * 17 | |
) | |
first_line = message_lines.pop(0) | |
first_str = f"jc: Error - {first_line}" | |
first_str = first_wrapper.fill(first_str) | |
_safe_print(first_str, file=sys.stderr) | |
for line in message_lines: | |
if line == "": | |
continue | |
message = next_wrapper.fill(line) | |
_safe_print(message, file=sys.stderr) | |
def is_compatible(compatible: List[str]) -> bool: | |
""" | |
Returns True if the parser is compatible with the running OS platform. | |
""" | |
platform_found = False | |
for platform in compatible: | |
if sys.platform.startswith(platform): | |
platform_found = True | |
break | |
return platform_found | |
def compatibility(mod_name: str, compatible: List[str], quiet: bool = False) -> None: | |
""" | |
Checks for the parser's compatibility with the running OS platform and | |
prints a warning message to `STDERR` if not compatible and | |
`quiet=False.` | |
Parameters: | |
mod_name: (string) __name__ of the calling module | |
compatible: (list) sys.platform name(s) compatible with | |
the parser. compatible options: | |
linux, darwin, cygwin, win32, aix, freebsd | |
quiet: (bool) suppress compatibility message if True | |
Returns: | |
None - just prints output to STDERR | |
""" | |
if not quiet and not is_compatible(compatible): | |
mod = mod_name.split(".")[-1] | |
compat_list = ", ".join(compatible) | |
warning_message( | |
[ | |
f"`{mod}` command output from this OS ({sys.platform}) is not supported.", | |
f"`{mod}` command output from the following platforms is supported: {compat_list}", | |
"Disregard this warning if you are processing output that came from a supported platform. (Use the -q option to suppress this warning)", | |
] | |
) | |
def has_data(data: Union[str, bytes]) -> bool: | |
""" | |
Checks if the string input contains data. If there are any | |
non-whitespace characters then return `True`, else return `False`. | |
For bytes, returns True if there is any data. | |
Parameters: | |
data: (string, bytes) input to check whether it contains data | |
Returns: | |
Boolean True if input string (data) contains non-whitespace | |
characters, otherwise False. For bytes data, returns | |
True if there is any data, otherwise False. | |
""" | |
if isinstance(data, str): | |
return bool(data and not data.isspace()) | |
return bool(data) | |
def remove_quotes(data: str) -> str: | |
""" | |
Remove single or double quotes surrounding a string. If no quotes are | |
found then the string is returned unmodified. | |
Parameters: | |
data: (string) Input value | |
Returns: | |
string | |
""" | |
if data.startswith('"') and data.endswith('"'): | |
data = data[1:-1] | |
elif data.startswith("'") and data.endswith("'"): | |
data = data[1:-1] | |
return data | |
def normalize_key(data: str) -> str: | |
r""" | |
Normalize a key name by shifting to lower-case and converting special | |
characters to underscores. | |
Special characters are defined as `space` and the following: | |
!"#$%&'()*+,-./:;<=>?@[\]^`{|}~ | |
This is a lossy algorithm. Repeating and trailing underscores are | |
removed. | |
Parameters: | |
data: (string) Input value | |
Returns: | |
string | |
""" | |
special = r"""!"#$%&'()*+,-./:;<=>?@[\]^`{|}~ """ | |
initial_underscore = False | |
data = data.strip().lower() | |
for special_char in special: | |
data = data.replace(special_char, "_") | |
if data.startswith("_"): | |
initial_underscore = True | |
# swap back to space so split() will compress multiple consecutive down to one | |
data = data.strip("_").replace("_", " ") | |
data = "_".join(data.split()) | |
if initial_underscore: | |
data = "_" + data | |
return data | |
def convert_to_int(value: object) -> Optional[int]: | |
""" | |
Converts string and float input to int. Strips all non-numeric | |
characters from strings. | |
Parameters: | |
value: (string/float) Input value | |
Returns: | |
integer/None Integer if successful conversion, otherwise None | |
""" | |
if isinstance(value, str): | |
str_val = re.sub(r"[^0-9\-\.]", "", value) | |
try: | |
return int(str_val) | |
except (ValueError, TypeError): | |
try: | |
return int(float(str_val)) | |
except (ValueError, TypeError): | |
return None | |
elif isinstance(value, (int, float)): | |
return int(value) | |
else: | |
return None | |
def convert_to_float(value: object) -> Optional[float]: | |
""" | |
Converts string and int input to float. Strips all non-numeric | |
characters from strings. | |
Parameters: | |
value: (string/integer) Input value | |
Returns: | |
float/None Float if successful conversion, otherwise None | |
""" | |
if isinstance(value, str): | |
try: | |
return float(re.sub(r"[^0-9\-\.]", "", value)) | |
except (ValueError, TypeError): | |
return None | |
elif isinstance(value, (int, float)): | |
return float(value) | |
else: | |
return None | |
def convert_to_bool(value: object) -> bool: | |
""" | |
Converts string, integer, or float input to boolean by checking | |
for 'truthy' values. | |
Parameters: | |
value: (string/integer/float) Input value | |
Returns: | |
True/False False unless a 'truthy' number or string is found | |
('y', 'yes', 'true', '1', 1, -1, etc.) | |
""" | |
# if number, then bool it | |
# if string, try to convert to float | |
# if float converts, then bool the result | |
# if float does not convert then look for truthy string and bool True | |
# else False | |
truthy = ["y", "yes", "true", "*"] | |
if isinstance(value, (int, float)): | |
return bool(value) | |
if isinstance(value, str): | |
try: | |
test_value = convert_to_float(value) | |
if test_value is not None: | |
return bool(test_value) | |
except Exception: | |
pass | |
if value: | |
return value.lower() in truthy | |
return False | |
# convert_size_to_int from https://github.com/xolox/python-humanfriendly | |
# Copyright (c) 2021 Peter Odding | |
# Permission is hereby granted, free of charge, to any person obtaining | |
# a copy of this software and associated documentation files (the | |
# "Software"), to deal in the Software without restriction, including | |
# without limitation the rights to use, copy, modify, merge, publish, | |
# distribute, sublicense, and/or sell copies of the Software, and to | |
# permit persons to whom the Software is furnished to do so, subject to | |
# the following conditions: | |
# The above copyright notice and this permission notice shall be | |
# included in all copies or substantial portions of the Software. | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | |
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE | |
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION | |
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION | |
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
def convert_size_to_int(size: str, binary: bool = False) -> Optional[int]: | |
""" | |
Parse a human readable data size and return the number of bytes. | |
Parameters: | |
size: (string) The human readable file size to parse. | |
binary: (boolean) `True` to use binary multiples of bytes | |
(base-2) for ambiguous unit symbols and names, | |
`False` to use decimal multiples of bytes (base-10). | |
Returns: | |
integer/None Integer if successful conversion, otherwise None | |
This function knows how to parse sizes in bytes, kilobytes, megabytes, | |
gigabytes, terabytes and petabytes. Some examples: | |
>>> convert_size_to_int('42') | |
42 | |
>>> convert_size_to_int('13b') | |
13 | |
>>> convert_size_to_int('5 bytes') | |
5 | |
>>> convert_size_to_int('1 KB') | |
1000 | |
>>> convert_size_to_int('1 kilobyte') | |
1000 | |
>>> convert_size_to_int('1 KiB') | |
1024 | |
>>> convert_size_to_int('1 KB', binary=True) | |
1024 | |
>>> convert_size_to_int('1.5 GB') | |
1500000000 | |
>>> convert_size_to_int('1.5 GB', binary=True) | |
1610612736 | |
""" | |
# normalize input by removing commas | |
size = size.replace(",", "") | |
def tokenize(text: str) -> List[str]: | |
tokenized_input: List = [] | |
for token in re.split(r"(\d+(?:\.\d+)?)", text): | |
token = token.strip() | |
if re.match(r"\d+\.\d+", token): | |
tokenized_input.append(float(token)) | |
elif token.isdigit(): | |
tokenized_input.append(int(token)) | |
elif token: | |
tokenized_input.append(token) | |
return tokenized_input | |
SizeUnit = namedtuple("SizeUnit", "divider, symbol, name") | |
CombinedUnit = namedtuple("CombinedUnit", "decimal, binary") | |
disk_size_units = ( | |
CombinedUnit( | |
SizeUnit(1000**1, "KB", "kilobyte"), SizeUnit(1024**1, "KiB", "kibibyte") | |
), | |
CombinedUnit( | |
SizeUnit(1000**2, "MB", "megabyte"), SizeUnit(1024**2, "MiB", "mebibyte") | |
), | |
CombinedUnit( | |
SizeUnit(1000**3, "GB", "gigabyte"), SizeUnit(1024**3, "GiB", "gibibyte") | |
), | |
CombinedUnit( | |
SizeUnit(1000**4, "TB", "terabyte"), SizeUnit(1024**4, "TiB", "tebibyte") | |
), | |
CombinedUnit( | |
SizeUnit(1000**5, "PB", "petabyte"), SizeUnit(1024**5, "PiB", "pebibyte") | |
), | |
CombinedUnit( | |
SizeUnit(1000**6, "EB", "exabyte"), SizeUnit(1024**6, "EiB", "exbibyte") | |
), | |
CombinedUnit( | |
SizeUnit(1000**7, "ZB", "zettabyte"), SizeUnit(1024**7, "ZiB", "zebibyte") | |
), | |
CombinedUnit( | |
SizeUnit(1000**8, "YB", "yottabyte"), SizeUnit(1024**8, "YiB", "yobibyte") | |
), | |
) | |
tokens = tokenize(size) | |
if tokens and isinstance(tokens[0], Number): | |
# Get the normalized unit (if any) from the tokenized input. | |
normalized_unit = ( | |
tokens[1].lower() if len(tokens) == 2 and isinstance(tokens[1], str) else "" | |
) | |
# If the input contains only a number, it's assumed to be the number of | |
# bytes. The second token can also explicitly reference the unit bytes. | |
if len(tokens) == 1 or normalized_unit.startswith("b"): | |
return int(tokens[0]) | |
# Otherwise we expect two tokens: A number and a unit. | |
if normalized_unit: | |
# Convert plural units to singular units, for details: | |
# https://github.com/xolox/python-humanfriendly/issues/26 | |
normalized_unit = normalized_unit.rstrip("s") | |
for unit in disk_size_units: | |
# First we check for unambiguous symbols (KiB, MiB, GiB, etc) | |
# and names (kibibyte, mebibyte, gibibyte, etc) because their | |
# handling is always the same. | |
if normalized_unit in ( | |
unit.binary.symbol.lower(), | |
unit.binary.name.lower(), | |
): | |
return int(tokens[0] * unit.binary.divider) | |
# Now we will deal with ambiguous prefixes (K, M, G, etc), | |
# symbols (KB, MB, GB, etc) and names (kilobyte, megabyte, | |
# gigabyte, etc) according to the caller's preference. | |
if normalized_unit in ( | |
unit.decimal.symbol.lower(), | |
unit.decimal.name.lower(), | |
) or normalized_unit.startswith(unit.decimal.symbol[0].lower()): | |
return int( | |
tokens[0] | |
* (unit.binary.divider if binary else unit.decimal.divider) | |
) | |
# We failed to parse the size specification. | |
return None | |
def input_type_check(data: object) -> None: | |
"""Ensure input data is a string. Raises `TypeError` if not.""" | |
if not isinstance(data, str): | |
raise TypeError("Input data must be a 'str' object.") | |
def _lazy_splitlines(text: str) -> Iterable[str]: | |
NEWLINES_PATTERN: str = r"(\r\n|\r|\n)" | |
NEWLINES_RE = re.compile(NEWLINES_PATTERN) | |
start = 0 | |
for m in NEWLINES_RE.finditer(text): | |
begin, end = m.span() | |
if begin != start: | |
yield text[start:begin] | |
else: | |
yield "" | |
start = end | |
if text[start:]: | |
yield text[start:] | |
def line_slice( | |
data: Union[str, Iterable[str], TextIO, bytes, None], | |
slice_start: Optional[int] = None, | |
slice_end: Optional[int] = None, | |
) -> Union[str, Iterable[str], TextIO, bytes, None]: | |
""" | |
Slice input data by lines - lazily, if possible. | |
Accepts a string (for normal parsers) or an iterable (for streaming | |
parsers). Uses normal start/stop slicing values, but will always slice | |
on lines instead of characters. Positive slices will use less memory as | |
the function will attempt to lazily iterate over the input. A negative | |
slice parameter will force the function to read in all of the data and | |
then slice, which will use more memory. | |
Parameters: | |
data: (string or iterable) - input to slice by lines | |
slice_start: (int) - starting line | |
slice_end: (int) - ending line | |
Returns: | |
string if input is a string. | |
iterable of strings if input is an iterable (for streaming parsers) | |
""" | |
if not slice_start is None or not slice_end is None: | |
# standard parsers UTF-8 input | |
if isinstance(data, str): | |
data_iter = _lazy_splitlines(data) | |
# positive slices | |
if (slice_start is None or slice_start >= 0) and ( | |
slice_end is None or slice_end >= 0 | |
): | |
return "\n".join(islice(data_iter, slice_start, slice_end)) | |
# negative slices found (non-lazy, uses more memory) | |
else: | |
return "\n".join(list(data_iter)[slice_start:slice_end]) | |
# standard parsers bytes input | |
elif isinstance(data, bytes): | |
raise ValueError("Cannot slice bytes data.") | |
# streaming parsers UTF-8 input | |
else: | |
# positive slices | |
if ( | |
(slice_start is None or slice_start >= 0) | |
and (slice_end is None or slice_end >= 0) | |
and data | |
): | |
return islice(data, slice_start, slice_end) | |
# negative slices found (non-lazy, uses more memory) | |
elif data: | |
return list(data)[slice_start:slice_end] | |
return data | |
class timestamp: | |
__slots__ = ("string", "format", "naive", "utc", "iso") | |
def __init__( | |
self, | |
datetime_string: Optional[str], | |
format_hint: Optional[Iterable[int]] = None, | |
) -> None: | |
""" | |
Input a datetime text string of several formats and convert to a | |
naive or timezone-aware epoch timestamp in UTC. | |
Parameters: | |
datetime_string (str): a string representation of a | |
datetime in several supported formats | |
format_hint (iterable): an optional iterable of format ID | |
integers to instruct the timestamp object to try those | |
formats first in the order given. Other formats will be | |
tried after the format hint list is exhausted. This can | |
speed up timestamp conversion so several different formats | |
don't have to be tried in brute-force fashion. | |
Returns a timestamp object with the following attributes: | |
string (str): the input datetime string | |
format (int | None): the format rule that was used to decode | |
the datetime string. None if conversion fails. | |
naive (int | None): timestamp based on locally configured | |
timezone. None if conversion fails. | |
utc (int | None): aware timestamp only if UTC timezone | |
detected in datetime string. None if conversion fails. | |
iso (str | None): ISO string - timezone information is output | |
only if UTC timezone is detected in the datetime string. | |
""" | |
self.string = datetime_string | |
if not format_hint: | |
format_hint = tuple() | |
else: | |
format_hint = tuple(format_hint) | |
dt = self._parse_dt(self.string, format_hint=format_hint) | |
self.format = dt["format"] | |
self.naive = dt["timestamp_naive"] | |
self.utc = dt["timestamp_utc"] | |
self.iso = dt["iso"] | |
def __repr__(self) -> str: | |
return f"timestamp(string={self.string!r}, format={self.format}, naive={self.naive}, utc={self.utc}, iso={self.iso!r})" | |
@staticmethod | |
@lru_cache(maxsize=2048) | |
def _parse_dt( | |
dt_string: Optional[str], format_hint: Optional[Iterable[int]] = None | |
) -> Dict[str, Any]: | |
""" | |
Input a datetime text string of several formats and convert to | |
a naive or timezone-aware epoch timestamp in UTC. | |
Parameters: | |
dt_string: (string) a string representation of a date-time | |
in several supported formats | |
format_hint: (list | tuple) a list of format ID int's that | |
should be tried first. This can increase | |
performance since the function will not need to | |
try many incorrect formats before finding the | |
correct one. | |
Returns: | |
Dictionary of the following format: | |
{ | |
# for debugging purposes. None if conversion fails | |
"format": int, | |
# timestamp based on locally configured timezone. | |
# None if conversion fails. | |
"timestamp_naive": int, | |
# aware timestamp only if UTC timezone detected. | |
# None if conversion fails. | |
"timestamp_utc": int | |
# ISO string. None if conversion fails. | |
"iso": str | |
} | |
The `format` integer denotes which date_time format | |
conversion succeeded. | |
The `timestamp_naive` integer is the converted date-time | |
string to a naive epoch timestamp. | |
The `timestamp_utc` integer is the converted date-time | |
string to an aware epoch timestamp in the UTC timezone. If | |
an aware conversion cannot be performed (e.g. the UTC | |
timezone is not found in the date-time string), then this | |
field will be None. | |
The `iso` string will only have timezone information if the | |
UTC timezone is detected in `dt_string`. | |
If the conversion completely fails, all fields will be None. | |
""" | |
formats: tuple[TimeStampFormatType, ...] = ( | |
{ | |
"id": 1000, | |
"format": "%a %b %d %H:%M:%S %Y", | |
"locale": None, | |
}, # manual C locale format conversion: Tue Mar 23 16:12:11 2021 or Tue Mar 23 16:12:11 IST 2021 | |
{ | |
"id": 1100, | |
"format": "%a %b %d %H:%M:%S %Y %z", | |
"locale": None, | |
}, # git date output: Thu Mar 5 09:17:40 2020 -0800 | |
{ | |
"id": 1300, | |
"format": "%Y-%m-%dT%H:%M:%S.%f%Z", | |
"locale": None, | |
}, # ISO Format with UTC (found in syslog 5424): 2003-10-11T22:14:15.003Z | |
{ | |
"id": 1310, | |
"format": "%Y-%m-%dT%H:%M:%S.%f", | |
"locale": None, | |
}, # ISO Format without TZ (found in syslog 5424): 2003-10-11T22:14:15.003 | |
{ | |
"id": 1400, | |
"format": "%b %d %Y %H:%M:%S.%f UTC", | |
"locale": None, | |
}, # CEF Format with UTC: Nov 08 2022 12:30:00.111 UTC | |
{ | |
"id": 1410, | |
"format": "%b %d %Y %H:%M:%S.%f", | |
"locale": None, | |
}, # CEF Format without TZ: Nov 08 2022 12:30:00.111 | |
{ | |
"id": 1420, | |
"format": "%b %d %Y %H:%M:%S UTC", | |
"locale": None, | |
}, # CEF Format with UTC without microseconds: Nov 08 2022 12:30:00 UTC | |
{ | |
"id": 1430, | |
"format": "%b %d %Y %H:%M:%S", | |
"locale": None, | |
}, # CEF Format without TZ or microseconds: Nov 08 2022 12:30:00 | |
{ | |
"id": 1500, | |
"format": "%Y-%m-%d %H:%M", | |
"locale": None, | |
}, # en_US.UTF-8 local format (found in who cli output): 2021-03-23 00:14 | |
{ | |
"id": 1600, | |
"format": "%m/%d/%Y %I:%M %p", | |
"locale": None, | |
}, # Windows english format (found in dir cli output): 12/07/2019 02:09 AM | |
{ | |
"id": 1700, | |
"format": "%m/%d/%Y, %I:%M:%S %p", | |
"locale": None, | |
}, # Windows english format wint non-UTC tz (found in systeminfo cli output): 3/22/2021, 1:15:51 PM (UTC-0600) | |
{ | |
"id": 1705, | |
"format": "%m/%d/%Y, %I:%M:%S %p %Z", | |
"locale": None, | |
}, # Windows english format with UTC tz (found in systeminfo cli output): 3/22/2021, 1:15:51 PM (UTC) | |
{ | |
"id": 1710, | |
"format": "%m/%d/%Y, %I:%M:%S %p UTC%z", | |
"locale": None, | |
}, # Windows english format with UTC tz (found in systeminfo cli output): 3/22/2021, 1:15:51 PM (UTC+0000) | |
{ | |
"id": 1750, | |
"format": "%Y/%m/%d-%H:%M:%S.%f", | |
"locale": None, | |
}, # Google Big Table format with no timezone: 1970/01/01-01:00:00.000000 | |
{ | |
"id": 1755, | |
"format": "%Y/%m/%d-%H:%M:%S.%f%z", | |
"locale": None, | |
}, # Google Big Table format with timezone: 1970/01/01-01:00:00.000000+00:00 | |
{ | |
"id": 1760, | |
"format": "%Y-%m-%d %H:%M:%S%z", | |
"locale": None, | |
}, # certbot format with timezone: 2023-06-12 01:35:30+00:00 | |
{ | |
"id": 1800, | |
"format": "%d/%b/%Y:%H:%M:%S %z", | |
"locale": None, | |
}, # Common Log Format: 10/Oct/2000:13:55:36 -0700 | |
{ | |
"id": 2000, | |
"format": "%a %d %b %Y %I:%M:%S %p %Z", | |
"locale": None, | |
}, # en_US.UTF-8 local format (found in upower cli output): Tue 23 Mar 2021 04:12:11 PM UTC | |
{ | |
"id": 3000, | |
"format": "%a %d %b %Y %I:%M:%S %p", | |
"locale": None, | |
}, # en_US.UTF-8 local format with non-UTC tz (found in upower cli output): Tue 23 Mar 2021 04:12:11 PM IST | |
{ | |
"id": 3500, | |
"format": "%a, %d %b %Y %H:%M:%S %Z", | |
"locale": None, | |
}, # HTTP header time format (always GMT so assume UTC): Wed, 31 Jan 2024 00:39:28 GMT | |
{ | |
"id": 4000, | |
"format": "%A %d %B %Y %I:%M:%S %p %Z", | |
"locale": None, | |
}, # European-style local format (found in upower cli output): Tuesday 01 October 2019 12:50:41 PM UTC | |
{ | |
"id": 5000, | |
"format": "%A %d %B %Y %I:%M:%S %p", | |
"locale": None, | |
}, # European-style local format with non-UTC tz (found in upower cli output): Tuesday 01 October 2019 12:50:41 PM IST | |
{ | |
"id": 6000, | |
"format": "%a %b %d %I:%M:%S %p %Z %Y", | |
"locale": None, | |
}, # en_US.UTF-8 format (found in date cli): Wed Mar 24 06:16:19 PM UTC 2021 | |
{ | |
"id": 7000, | |
"format": "%a %b %d %H:%M:%S %Z %Y", | |
"locale": None, | |
}, # C locale format (found in date cli): Wed Mar 24 11:11:30 UTC 2021 | |
{ | |
"id": 7100, | |
"format": "%b %d %H:%M:%S %Y", | |
"locale": None, | |
}, # C locale format (found in stat cli output - osx): # Mar 29 11:49:05 2021 | |
{ | |
"id": 7200, | |
"format": "%Y-%m-%d %H:%M:%S.%f %z", | |
"locale": None, | |
}, # C locale format (found in stat cli output - linux): 2019-08-13 18:13:43.555604315 -0400 | |
{ | |
"id": 7250, | |
"format": "%Y-%m-%d %H:%M:%S", | |
"locale": None, | |
}, # C locale format with non-UTC tz (found in modified vmstat cli output): # 2021-09-16 20:32:28 PDT | |
{ | |
"id": 7255, | |
"format": "%Y-%m-%d %H:%M:%S %Z", | |
"locale": None, | |
}, # C locale format (found in modified vmstat cli output): # 2021-09-16 20:32:28 UTC | |
{ | |
"id": 7300, | |
"format": "%a %Y-%m-%d %H:%M:%S %Z", | |
"locale": None, | |
}, # C locale format (found in timedatectl cli output): # Wed 2020-03-11 00:53:21 UTC | |
# attempt locale changes last | |
{ | |
"id": 8000, | |
"format": "%a %d %b %Y %H:%M:%S %Z", | |
"locale": "", | |
}, # current locale format (found in upower cli output): # mar. 23 mars 2021 23:12:11 UTC | |
{ | |
"id": 8100, | |
"format": "%a %d %b %Y %H:%M:%S", | |
"locale": "", | |
}, # current locale format with non-UTC tz (found in upower cli output): # mar. 23 mars 2021 19:12:11 EDT | |
{ | |
"id": 8200, | |
"format": "%A %d %B %Y, %H:%M:%S UTC%z", | |
"locale": "", | |
}, # fr_FR.utf8 locale format (found in date cli output): vendredi 26 mars 2021, 13:26:46 (UTC+0000) | |
{ | |
"id": 8300, | |
"format": "%A %d %B %Y, %H:%M:%S", | |
"locale": "", | |
}, # fr_FR.utf8 locale format with non-UTC tz (found in date cli output): vendredi 26 mars 2021, 13:26:46 (UTC-0400) | |
{ | |
"id": 9000, | |
"format": "%c", | |
"locale": "", | |
}, # locally configured locale format conversion: Could be anything :) this is a last-gasp attempt | |
) | |
# from https://www.timeanddate.com/time/zones/ | |
# only removed UTC & GMT timezones and added known non-UTC offsets | |
tz_abbr: set[str] = { | |
"A", | |
"ACDT", | |
"ACST", | |
"ACT", | |
"ACWST", | |
"ADT", | |
"AEDT", | |
"AEST", | |
"AET", | |
"AFT", | |
"AKDT", | |
"AKST", | |
"ALMT", | |
"AMST", | |
"AMT", | |
"ANAST", | |
"ANAT", | |
"AQTT", | |
"ART", | |
"AST", | |
"AT", | |
"AWDT", | |
"AWST", | |
"AZOST", | |
"AZOT", | |
"AZST", | |
"AZT", | |
"AoE", | |
"B", | |
"BNT", | |
"BOT", | |
"BRST", | |
"BRT", | |
"BST", | |
"BTT", | |
"C", | |
"CAST", | |
"CAT", | |
"CCT", | |
"CDT", | |
"CEST", | |
"CET", | |
"CHADT", | |
"CHAST", | |
"CHOST", | |
"CHOT", | |
"CHUT", | |
"CIDST", | |
"CIST", | |
"CKT", | |
"CLST", | |
"CLT", | |
"COT", | |
"CST", | |
"CT", | |
"CVT", | |
"CXT", | |
"ChST", | |
"D", | |
"DAVT", | |
"DDUT", | |
"E", | |
"EASST", | |
"EAST", | |
"EAT", | |
"ECT", | |
"EDT", | |
"EEST", | |
"EET", | |
"EGST", | |
"EGT", | |
"EST", | |
"ET", | |
"F", | |
"FET", | |
"FJST", | |
"FJT", | |
"FKST", | |
"FKT", | |
"FNT", | |
"G", | |
"GALT", | |
"GAMT", | |
"GET", | |
"GFT", | |
"GILT", | |
"GST", | |
"GYT", | |
"H", | |
"HDT", | |
"HKT", | |
"HOVST", | |
"HOVT", | |
"HST", | |
"I", | |
"ICT", | |
"IDT", | |
"IOT", | |
"IRDT", | |
"IRKST", | |
"IRKT", | |
"IRST", | |
"IST", | |
"JST", | |
"K", | |
"KGT", | |
"KOST", | |
"KRAST", | |
"KRAT", | |
"KST", | |
"KUYT", | |
"L", | |
"LHDT", | |
"LHST", | |
"LINT", | |
"M", | |
"MAGST", | |
"MAGT", | |
"MART", | |
"MAWT", | |
"MDT", | |
"MHT", | |
"MMT", | |
"MSD", | |
"MSK", | |
"MST", | |
"MT", | |
"MUT", | |
"MVT", | |
"MYT", | |
"N", | |
"NCT", | |
"NDT", | |
"NFDT", | |
"NFT", | |
"NOVST", | |
"NOVT", | |
"NPT", | |
"NRT", | |
"NST", | |
"NUT", | |
"NZDT", | |
"NZST", | |
"O", | |
"OMSST", | |
"OMST", | |
"ORAT", | |
"P", | |
"PDT", | |
"PET", | |
"PETST", | |
"PETT", | |
"PGT", | |
"PHOT", | |
"PHT", | |
"PKT", | |
"PMDT", | |
"PMST", | |
"PONT", | |
"PST", | |
"PT", | |
"PWT", | |
"PYST", | |
"PYT", | |
"Q", | |
"QYZT", | |
"R", | |
"RET", | |
"ROTT", | |
"S", | |
"SAKT", | |
"SAMT", | |
"SAST", | |
"SBT", | |
"SCT", | |
"SGT", | |
"SRET", | |
"SRT", | |
"SST", | |
"SYOT", | |
"T", | |
"TAHT", | |
"TFT", | |
"TJT", | |
"TKT", | |
"TLT", | |
"TMT", | |
"TOST", | |
"TOT", | |
"TRT", | |
"TVT", | |
"U", | |
"ULAST", | |
"ULAT", | |
"UYST", | |
"UYT", | |
"UZT", | |
"V", | |
"VET", | |
"VLAST", | |
"VLAT", | |
"VOST", | |
"VUT", | |
"W", | |
"WAKT", | |
"WARST", | |
"WAST", | |
"WAT", | |
"WEST", | |
"WET", | |
"WFT", | |
"WGST", | |
"WGT", | |
"WIB", | |
"WIT", | |
"WITA", | |
"WST", | |
"WT", | |
"X", | |
"Y", | |
"YAKST", | |
"YAKT", | |
"YAPT", | |
"YEKST", | |
"YEKT", | |
"UTC-1200", | |
"UTC-1100", | |
"UTC-1000", | |
"UTC-0930", | |
"UTC-0900", | |
"UTC-0800", | |
"UTC-0700", | |
"UTC-0600", | |
"UTC-0500", | |
"UTC-0400", | |
"UTC-0300", | |
"UTC-0230", | |
"UTC-0200", | |
"UTC-0100", | |
"UTC+0100", | |
"UTC+0200", | |
"UTC+0300", | |
"UTC+0400", | |
"UTC+0430", | |
"UTC+0500", | |
"UTC+0530", | |
"UTC+0545", | |
"UTC+0600", | |
"UTC+0630", | |
"UTC+0700", | |
"UTC+0800", | |
"UTC+0845", | |
"UTC+0900", | |
"UTC+1000", | |
"UTC+1030", | |
"UTC+1100", | |
"UTC+1200", | |
"UTC+1300", | |
"UTC+1345", | |
"UTC+1400", | |
} | |
offset_suffixes: tuple[str, ...] = ( | |
"-12:00", | |
"-11:00", | |
"-10:00", | |
"-09:30", | |
"-09:00", | |
"-08:00", | |
"-07:00", | |
"-06:00", | |
"-05:00", | |
"-04:00", | |
"-03:00", | |
"-02:30", | |
"-02:00", | |
"-01:00", | |
"+01:00", | |
"+02:00", | |
"+03:00", | |
"+04:00", | |
"+04:30", | |
"+05:00", | |
"+05:30", | |
"+05:45", | |
"+06:00", | |
"+06:30", | |
"+07:00", | |
"+08:00", | |
"+08:45", | |
"+09:00", | |
"+10:00", | |
"+10:30", | |
"+11:00", | |
"+12:00", | |
"+13:00", | |
"+13:45", | |
"+14:00", | |
) | |
data: str = dt_string or "" | |
normalized_datetime: str = "" | |
utc_tz: bool = False | |
dt: Optional[datetime] = None | |
dt_utc: Optional[datetime] = None | |
timestamp_naive: Optional[int] = None | |
timestamp_utc: Optional[int] = None | |
iso_string: Optional[str] = None | |
timestamp_obj: Dict[str, Any] = { | |
"format": None, | |
"timestamp_naive": None, | |
"timestamp_utc": None, | |
"iso": None, | |
} | |
# convert format_hint to a tuple so it is hashable (for lru_cache) | |
if not format_hint: | |
format_hint = tuple() | |
else: | |
format_hint = tuple(format_hint) | |
# sometimes UTC is referenced as 'Coordinated Universal Time'. Convert to 'UTC' | |
data = data.replace("Coordinated Universal Time", "UTC") | |
# UTC can also be indicated with 'Z' for Zulu time (ISO-8601). Convert to 'UTC' | |
data = data.replace("Z", "UTC") | |
# GMT and UTC are practically equivalent. Convert to 'UTC' | |
data = data.replace("GMT", "UTC") | |
if "UTC" in data: | |
utc_tz = True | |
if "UTC+" in data or "UTC-" in data: | |
utc_tz = bool("UTC+0000" in data or "UTC-0000" in data) | |
elif "+0000" in data or "-0000" in data or "+00:00" in data or "-00:00" in data: | |
utc_tz = True | |
data = data.replace("+00:00", "+0000") # fix for python 3.6 | |
# normalize the timezone by taking out any timezone reference, except UTC | |
cleandata = data.replace("(", "").replace(")", "") | |
normalized_datetime_list: List[str] = [] | |
for term in cleandata.split(): | |
if term not in tz_abbr: | |
normalized_datetime_list.append(term) | |
normalized_datetime = " ".join(normalized_datetime_list) | |
# remove non UTC offset suffixes at the end of the string | |
for suffix in offset_suffixes: | |
if normalized_datetime.endswith(suffix): | |
normalized_datetime = normalized_datetime[0 : -len(suffix)] | |
break | |
# normalize further by converting any greater-than 6-digit subsecond to 6-digits | |
p = re.compile(r"(\W\d\d:\d\d:\d\d\.\d{6})\d+\W") | |
normalized_datetime = p.sub(r"\g<1> ", normalized_datetime) | |
# try format hints first, then fall back to brute-force method | |
hint_obj_list: List[TimeStampFormatType] = [] | |
for fmt_id in format_hint: | |
for fmt in formats: | |
if fmt_id == fmt["id"]: | |
hint_obj_list.append(fmt) | |
remaining_formats = [fmt for fmt in formats if not fmt["id"] in format_hint] | |
optimized_formats = hint_obj_list + remaining_formats | |
for fmt in optimized_formats: | |
try: | |
locale.setlocale(locale.LC_TIME, fmt["locale"]) | |
dt = datetime.strptime(normalized_datetime, fmt["format"]) | |
timestamp_obj["format"] = fmt["id"] | |
timestamp_naive = int(dt.replace(tzinfo=None).timestamp()) | |
iso_string = dt.replace(tzinfo=None).isoformat() | |
locale.setlocale(locale.LC_TIME, None) | |
break | |
except Exception: | |
locale.setlocale(locale.LC_TIME, None) | |
continue | |
if dt and utc_tz: | |
dt_utc = dt.replace(tzinfo=timezone.utc) | |
timestamp_utc = int(dt_utc.timestamp()) | |
iso_string = dt_utc.isoformat() | |
if timestamp_naive: | |
timestamp_obj["timestamp_naive"] = timestamp_naive | |
timestamp_obj["timestamp_utc"] = timestamp_utc | |
timestamp_obj["iso"] = iso_string | |
return timestamp_obj | |
# from universal.py | |
r"""jc - JSON Convert universal parsers""" | |
from typing import Iterable, List, Dict | |
def simple_table_parse(data: Iterable[str]) -> List[Dict]: | |
""" | |
Parse simple tables. There should be no blank cells. The last column | |
may contain data with spaces. | |
Example Table: | |
col_1 col_2 col_3 col_4 col_5 | |
apple orange pear banana my favorite fruits | |
carrot squash celery spinach my favorite veggies | |
chicken beef pork eggs my favorite proteins | |
[{'col_1': 'apple', 'col_2': 'orange', 'col_3': 'pear', 'col_4': | |
'banana', 'col_5': 'my favorite fruits'}, {'col_1': 'carrot', | |
'col_2': 'squash', 'col_3': 'celery', 'col_4': 'spinach', 'col_5': | |
'my favorite veggies'}, {'col_1': 'chicken', 'col_2': 'beef', | |
'col_3': 'pork', 'col_4': 'eggs', 'col_5': 'my favorite proteins'}] | |
Parameters: | |
data: (iter) Text data to parse that has been split into lines | |
via .splitlines(). Item 0 must be the header row. | |
Any spaces in header names should be changed to | |
underscore '_'. You should also ensure headers are | |
lowercase by using .lower(). | |
Also, ensure there are no blank rows in the data. | |
Returns: | |
List of Dictionaries | |
""" | |
# code adapted from Conor Heine at: | |
# https://gist.github.com/cahna/43a1a3ff4d075bcd71f9d7120037a501 | |
# cast iterable to a list. Also keeps from mutating the caller's list | |
data = list(data) | |
headers = [h for h in " ".join(data[0].strip().split()).split() if h] | |
raw_data = map(lambda s: s.strip().split(None, len(headers) - 1), data[1:]) | |
raw_output = [dict(zip(headers, r)) for r in raw_data] | |
return raw_output | |
def sparse_table_parse(data: Iterable[str], delim: str = "\u2063") -> List[Dict]: | |
""" | |
Parse tables with missing column data or with spaces in column data. | |
Blank cells are converted to None in the resulting dictionary. Data | |
elements must line up within column boundaries. | |
Example Table: | |
col_1 col_2 col_3 col_4 col_5 | |
apple orange fuzzy peach my favorite fruits | |
green beans celery spinach my favorite veggies | |
chicken beef brown eggs my favorite proteins | |
[{'col_1': 'apple', 'col_2': 'orange', 'col_3': None, 'col_4': | |
'fuzzy peach', 'col_5': 'my favorite fruits'}, {'col_1': | |
'green beans', 'col_2': None, 'col_3': 'celery', 'col_4': 'spinach', | |
'col_5': 'my favorite veggies'}, {'col_1': 'chicken', 'col_2': | |
'beef', 'col_3': None, 'col_4': 'brown eggs', 'col_5': | |
'my favorite proteins'}] | |
Parameters: | |
data: (iter) An iterable of string lines (e.g. str.splitlines()) | |
Item 0 must be the header row. Any spaces in header | |
names should be changed to underscore '_'. You | |
should also ensure headers are lowercase by using | |
.lower(). Do not change the position of header | |
names as the positions are used to find the data. | |
Also, ensure there are no blank line items. | |
delim: (string) Delimiter to use. By default `u\\2063` | |
(invisible separator) is used since it is unlikely | |
to ever be seen in terminal output. You can change | |
this for troubleshooting purposes or if there is a | |
delimiter conflict with your data. | |
Returns: | |
List of Dictionaries | |
""" | |
# cast iterable to a list. Also keeps from mutating the caller's list | |
data = list(data) | |
# find the longest line and pad all lines with spaces to match | |
max_len = max([len(x) for x in data]) | |
new_data = [] | |
for line in data: | |
new_data.append(line + " " * (max_len - len(line))) | |
data = new_data | |
# find header | |
output: List = [] | |
header_text: str = data.pop(0) | |
header_text = header_text + " " | |
header_list: List = header_text.split() | |
# find each column index and end position | |
header_search = [header_list[0]] | |
for h in header_list[1:]: | |
header_search.append(" " + h + " ") | |
header_spec_list = [] | |
for i, column in enumerate(header_list[0 : len(header_list) - 1]): | |
header_spec = {"name": column, "end": header_text.find(header_search[i + 1])} | |
header_spec_list.append(header_spec) | |
# parse lines | |
if data: | |
for entry in data: | |
output_line = {} | |
# insert new separator since data can contain spaces | |
for col in reversed(header_list): | |
# find the right header_spec | |
for h_spec in header_spec_list: | |
if h_spec["name"] == col: | |
h_end = h_spec["end"] | |
# check if the location contains whitespace. if not | |
# then move to the left until a space is found | |
while h_end > 0 and not entry[h_end].isspace(): | |
h_end -= 1 | |
# insert custom delimiter | |
entry = entry[:h_end] + delim + entry[h_end + 1 :] | |
# create the entry list from the new custom delimiter | |
entry_list = entry.split(delim, maxsplit=len(header_list) - 1) | |
# clean up leading and trailing spaces in entry | |
clean_entry_list = [] | |
for col in entry_list: | |
clean_entry = col.strip() | |
if clean_entry == "": | |
clean_entry = None | |
clean_entry_list.append(clean_entry) | |
output_line = dict(zip(header_list, clean_entry_list)) | |
output.append(output_line) | |
return output | |
r"""jc - JSON Convert `lsusb` command output parser | |
Supports the `-v` option or no options. | |
Usage (cli): | |
$ lsusb -v | jc --lsusb | |
or | |
$ jc lsusb -v | |
Usage (module): | |
import jc | |
result = jc.parse('lsusb', lsusb_command_output) | |
Schema: | |
> Note: <item> object keynames are assigned directly from the lsusb | |
> output. If there are duplicate <item> names in a section, only the | |
> last one is converted. | |
[ | |
{ | |
"bus": string, | |
"device": string, | |
"id": string, | |
"description": string, | |
"device_descriptor": { | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
}, | |
"configuration_descriptor": { | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
}, | |
"interface_association": { | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
} | |
}, | |
"interface_descriptors": [ | |
{ | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
}, | |
"cdc_header": { | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
} | |
}, | |
"cdc_call_management": { | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
} | |
}, | |
"cdc_acm": { | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
} | |
}, | |
"cdc_union": { | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
} | |
}, | |
"cdc_mbim": { | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
} | |
}, | |
"cdc_mbim_extended": { | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
} | |
}, | |
"videocontrol_descriptors": [ | |
{ | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
} | |
} | |
], | |
"videostreaming_descriptors": [ | |
{ | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
} | |
} | |
], | |
"endpoint_descriptors": [ | |
{ | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string | |
] | |
} | |
} | |
] | |
} | |
] | |
} | |
}, | |
"hub_descriptor": { | |
"<item>": { | |
"value": string, | |
"description": string, | |
"attributes": [ | |
string, | |
] | |
}, | |
"hub_port_status": { | |
"<item>": { | |
"value": string, | |
"attributes": [ | |
string | |
] | |
} | |
} | |
}, | |
"device_qualifier": { | |
"<item>": { | |
"value": string, | |
"description": string | |
} | |
}, | |
"device_status": { | |
"value": string, | |
"description": string | |
} | |
} | |
] | |
Examples: | |
$ lsusb -v | jc --lsusb -p | |
[ | |
{ | |
"bus": "002", | |
"device": "001", | |
"id": "1d6b:0001", | |
"description": "Linux Foundation 1.1 root hub", | |
"device_descriptor": { | |
"bLength": { | |
"value": "18" | |
}, | |
"bDescriptorType": { | |
"value": "1" | |
}, | |
"bcdUSB": { | |
"value": "1.10" | |
}, | |
... | |
"bNumConfigurations": { | |
"value": "1" | |
}, | |
"configuration_descriptor": { | |
"bLength": { | |
"value": "9" | |
}, | |
... | |
"iConfiguration": { | |
"value": "0" | |
}, | |
"bmAttributes": { | |
"value": "0xe0", | |
"attributes": [ | |
"Self Powered", | |
"Remote Wakeup" | |
] | |
}, | |
"MaxPower": { | |
"description": "0mA" | |
}, | |
"interface_descriptors": [ | |
{ | |
"bLength": { | |
"value": "9" | |
}, | |
... | |
"bInterfaceProtocol": { | |
"value": "0", | |
"description": "Full speed (or root) hub" | |
}, | |
"iInterface": { | |
"value": "0" | |
}, | |
"endpoint_descriptors": [ | |
{ | |
"bLength": { | |
"value": "7" | |
}, | |
... | |
"bmAttributes": { | |
"value": "3", | |
"attributes": [ | |
"Transfer Type Interrupt", | |
"Synch Type None", | |
"Usage Type Data" | |
] | |
}, | |
"wMaxPacketSize": { | |
"value": "0x0002", | |
"description": "1x 2 bytes" | |
}, | |
"bInterval": { | |
"value": "255" | |
} | |
} | |
] | |
} | |
] | |
} | |
}, | |
"hub_descriptor": { | |
"bLength": { | |
"value": "9" | |
}, | |
... | |
"wHubCharacteristic": { | |
"value": "0x000a", | |
"attributes": [ | |
"No power switching (usb 1.0)", | |
"Per-port overcurrent protection" | |
] | |
}, | |
... | |
"hub_port_status": { | |
"Port 1": { | |
"value": "0000.0103", | |
"attributes": [ | |
"power", | |
"enable", | |
"connect" | |
] | |
}, | |
"Port 2": { | |
"value": "0000.0103", | |
"attributes": [ | |
"power", | |
"enable", | |
"connect" | |
] | |
} | |
} | |
}, | |
"device_status": { | |
"value": "0x0001", | |
"description": "Self Powered" | |
} | |
} | |
] | |
""" | |
class info: | |
"""Provides parser metadata (version, author, etc.)""" | |
version = "1.4" | |
description = "`lsusb` command parser" | |
author = "Kelly Brazil" | |
author_email = "[email protected]" | |
compatible = ["linux"] | |
magic_commands = ["lsusb"] | |
tags = ["command"] | |
__version__ = info.version | |
def _process(proc_data): | |
""" | |
Final processing to conform to the schema. | |
Parameters: | |
proc_data: (List of Dictionaries) raw structured data to process | |
Returns: | |
List of Dictionaries. Structured to conform to the schema. | |
""" | |
# no further processing | |
return proc_data | |
class _NestedDict(dict): | |
# for ease of creating/updating nested dictionary structures | |
# https://stackoverflow.com/questions/5369723/multi-level-defaultdict-with-variable-depth | |
# https://ohuiginn.net/mt/2010/07/nested_dictionaries_in_python.html | |
def __getitem__(self, key): | |
if key in self: | |
return self.get(key) | |
return self.setdefault(key, _NestedDict()) | |
class _root_obj: | |
def __init__(self, name): | |
self.name = name | |
self.list = [] | |
def _entries_for_this_bus_exist(self, bus_idx): | |
"""Returns true if there are object entries for the corresponding bus index""" | |
for item in self.list: | |
keyname = tuple(item.keys())[0] | |
if ( | |
"_state" in item[keyname] | |
and item[keyname]["_state"]["bus_idx"] == bus_idx | |
): | |
return True | |
return False | |
def _update_output(self, bus_idx, output_line): | |
"""modifies output_line dictionary for the corresponding bus index. | |
output_line is the self.output_line attribute from the _lsusb object.""" | |
for item in self.list: | |
keyname = tuple(item.keys())[0] | |
if ( | |
"_state" in item[keyname] | |
and item[keyname]["_state"]["bus_idx"] == bus_idx | |
): | |
# is this a top level value or an attribute? | |
if item[keyname]["_state"]["attribute_value"]: | |
last_item = item[keyname]["_state"]["last_item"] | |
if "attributes" not in output_line[f"{self.name}"][last_item]: | |
output_line[f"{self.name}"][last_item]["attributes"] = [] | |
this_attribute = f'{keyname} {item[keyname].get("value", "")} {item[keyname].get("description", "")}'.strip() | |
output_line[f"{self.name}"][last_item]["attributes"].append( | |
this_attribute | |
) | |
continue | |
output_line[f"{self.name}"].update(item) | |
del output_line[f"{self.name}"][keyname]["_state"] | |
class _descriptor_obj: | |
def __init__(self, name): | |
self.name = name | |
self.list = [] | |
def _entries_for_this_bus_and_interface_idx_exist(self, bus_idx, iface_idx): | |
"""Returns true if there are object entries for the corresponding bus index | |
and interface index""" | |
for item in self.list: | |
keyname = tuple(item.keys())[0] | |
if ( | |
"_state" in item[keyname] | |
and item[keyname]["_state"]["bus_idx"] == bus_idx | |
and item[keyname]["_state"]["interface_descriptor_idx"] == iface_idx | |
): | |
return True | |
return False | |
def _update_output(self, bus_idx, iface_idx, output_line): | |
"""modifies output_line dictionary for the corresponding bus index and | |
interface index. output_line is the i_desc_obj object.""" | |
for item in self.list: | |
keyname = tuple(item.keys())[0] | |
if ( | |
"_state" in item[keyname] | |
and item[keyname]["_state"]["bus_idx"] == bus_idx | |
and item[keyname]["_state"]["interface_descriptor_idx"] == iface_idx | |
): | |
# is this a top level value or an attribute? | |
if item[keyname]["_state"]["attribute_value"]: | |
last_item = item[keyname]["_state"]["last_item"] | |
if "attributes" not in output_line[f"{self.name}"][last_item]: | |
output_line[f"{self.name}"][last_item]["attributes"] = [] | |
this_attribute = f'{keyname} {item[keyname].get("value", "")} {item[keyname].get("description", "")}'.strip() | |
output_line[f"{self.name}"][last_item]["attributes"].append( | |
this_attribute | |
) | |
continue | |
output_line[f"{self.name}"].update(item) | |
del output_line[f"{self.name}"][keyname]["_state"] | |
class _descriptor_list: | |
def __init__(self, name): | |
self.name = name | |
self.list = [] | |
def _entries_for_this_bus_and_interface_idx_exist(self, bus_idx, iface_idx): | |
"""Returns true if there are object entries for the corresponding bus index | |
and interface index""" | |
for item in self.list: | |
keyname = tuple(item.keys())[0] | |
if ( | |
"_state" in item[keyname] | |
and item[keyname]["_state"]["bus_idx"] == bus_idx | |
and item[keyname]["_state"]["interface_descriptor_idx"] == iface_idx | |
): | |
return True | |
return False | |
def _get_objects_list(self, bus_idx, iface_idx): | |
"""Returns a list of descriptor object dictionaries for the corresponding | |
bus index and interface index""" | |
object_collection = [] | |
# find max number of items in this object that match the bus_idx and iface_idx | |
num_of_items = -1 | |
for item in self.list: | |
keyname = tuple(item.keys())[0] | |
if ( | |
"_state" in item[keyname] | |
and item[keyname]["_state"]["bus_idx"] == bus_idx | |
and item[keyname]["_state"]["interface_descriptor_idx"] == iface_idx | |
): | |
num_of_items = item[keyname]["_state"][f"{self.name}_idx"] | |
# create and return the collection of objects that match the bus_idx and iface_idx | |
if num_of_items > -1: | |
for obj_idx in range(num_of_items + 1): | |
this_object = {} | |
for item in self.list: | |
keyname = tuple(item.keys())[0] | |
if ( | |
"_state" in item[keyname] | |
and item[keyname]["_state"]["bus_idx"] == bus_idx | |
and item[keyname]["_state"]["interface_descriptor_idx"] | |
== iface_idx | |
and item[keyname]["_state"][f"{self.name}_idx"] == obj_idx | |
): | |
# is this a top level value or an attribute? | |
if item[keyname]["_state"]["attribute_value"]: | |
last_item = item[keyname]["_state"]["last_item"] | |
if "attributes" not in this_object[last_item]: | |
this_object[last_item]["attributes"] = [] | |
this_attribute = f'{keyname} {item[keyname].get("value", "")} {item[keyname].get("description", "")}'.strip() | |
this_object[last_item]["attributes"].append(this_attribute) | |
continue | |
this_object.update(item) | |
del item[keyname]["_state"] | |
object_collection.append(this_object) | |
return object_collection | |
class _LsUsb: | |
def __init__(self): | |
self.raw_output = [] | |
self.output_line = _NestedDict() | |
self.section = "" | |
self.old_section = "" | |
# section_header is formatted with the correct spacing to be used with | |
# jc.parsers.universal.sparse_table_parse(). Pad end of string to be at least len of 25 | |
# this value changes for different sections (e.g. videocontrol & videostreaming) | |
self.normal_section_header = "key val description" | |
self.larger_section_header = "key val description" | |
self.bus_idx = -1 | |
self.interface_descriptor_idx = -1 | |
self.endpoint_descriptor_idx = -1 | |
self.videocontrol_interface_descriptor_idx = -1 | |
self.videostreaming_interface_descriptor_idx = -1 | |
self.last_item = "" | |
self.last_indent = 0 | |
self.attribute_value = False | |
self.bus_list = [] | |
self.device_descriptor = _root_obj("device_descriptor") | |
self.configuration_descriptor = _root_obj("configuration_descriptor") | |
self.interface_association = _root_obj("interface_association") | |
self.interface_descriptor_list = [] | |
self.cdc_header = _descriptor_obj("cdc_header") | |
self.cdc_call_management = _descriptor_obj("cdc_call_management") | |
self.cdc_acm = _descriptor_obj("cdc_acm") | |
self.cdc_union = _descriptor_obj("cdc_union") | |
self.cdc_mbim = _descriptor_obj("cdc_mbim") | |
self.cdc_mbim_extended = _descriptor_obj("cdc_mbim_extended") | |
self.endpoint_descriptors = _descriptor_list("endpoint_descriptor") | |
self.videocontrol_interface_descriptors = _descriptor_list( | |
"videocontrol_interface_descriptor" | |
) | |
self.videostreaming_interface_descriptors = _descriptor_list( | |
"videostreaming_interface_descriptor" | |
) | |
self.hid_device_descriptor = _descriptor_obj("hid_device_descriptor") | |
# self.report_descriptors_list = [] # not implemented | |
self.hub_descriptor = _root_obj("hub_descriptor") | |
self.hub_port_status_list = [] | |
self.device_qualifier_list = [] | |
self.device_status_list = [] | |
@staticmethod | |
def _count_indent(line): | |
indent = 0 | |
for char in line: | |
if char == " ": | |
indent += 1 | |
continue | |
break | |
return indent | |
def _add_attributes(self, line): | |
indent = self._count_indent(line) | |
# determine whether this is a top-level value item or lower-level attribute | |
if indent > self.last_indent and self.old_section == self.section: | |
self.attribute_value = True | |
elif ( | |
indent == self.last_indent | |
and self.attribute_value | |
and self.old_section == self.section | |
): | |
self.attribute_value = True | |
else: | |
self.attribute_value = False | |
section_header = self.normal_section_header | |
if ( | |
self.section == "videocontrol_interface_descriptor" | |
or self.section == "videostreaming_interface_descriptor" | |
or self.section == "cdc_mbim_extended" | |
): | |
section_header = self.larger_section_header | |
temp_obj = [section_header, line.strip() + (" " * 25)] | |
temp_obj = sparse_table_parse(temp_obj) | |
temp_obj = temp_obj[0] | |
line_obj = { | |
temp_obj["key"]: { | |
"value": temp_obj["val"], | |
"description": temp_obj["description"], | |
"_state": { | |
"attribute_value": self.attribute_value, | |
"last_item": self.last_item, | |
"bus_idx": self.bus_idx, | |
"interface_descriptor_idx": self.interface_descriptor_idx, | |
"endpoint_descriptor_idx": self.endpoint_descriptor_idx, | |
"videocontrol_interface_descriptor_idx": self.videocontrol_interface_descriptor_idx, | |
"videostreaming_interface_descriptor_idx": self.videostreaming_interface_descriptor_idx, | |
}, | |
} | |
} | |
if line_obj[temp_obj["key"]]["value"] is None: | |
del line_obj[temp_obj["key"]]["value"] | |
if line_obj[temp_obj["key"]]["description"] is None: | |
del line_obj[temp_obj["key"]]["description"] | |
self.old_section = self.section | |
self.last_indent = indent | |
if not self.attribute_value: | |
self.last_item = temp_obj["key"] | |
return line_obj | |
def _add_hub_port_status_attributes(self, line): | |
# Port 1: 0000.0103 power enable connect | |
first_split = line.split(": ", maxsplit=1) | |
port_field = first_split[0].strip() | |
second_split = first_split[1].split(maxsplit=1) | |
port_val = second_split[0] | |
attributes = second_split[1].split() | |
return { | |
port_field: { | |
"value": port_val, | |
"attributes": attributes, | |
"_state": {"bus_idx": self.bus_idx}, | |
} | |
} | |
def _add_device_status_attributes(self, line): | |
return {"description": line.strip(), "_state": {"bus_idx": self.bus_idx}} | |
def _set_sections(self, line): | |
# ignore blank lines | |
if not line: | |
self.section = "" | |
self.attribute_value = False | |
return True | |
# bus information is on the same line so need to extract data | |
# immediately and set indexes | |
if line.startswith("Bus "): | |
self.section = "bus" | |
self.bus_idx += 1 | |
self.interface_descriptor_idx = -1 | |
self.endpoint_descriptor_idx = -1 | |
self.videocontrol_interface_descriptor_idx = -1 | |
self.videostreaming_interface_descriptor_idx = -1 | |
self.attribute_value = False | |
line_split = line.strip().split(maxsplit=6) | |
self.bus_list.append( | |
{ | |
"bus": line_split[1], | |
"device": line_split[3][:-1], | |
"id": line_split[5], | |
# way to get a list item or None | |
"description": (line_split[6:7] or [None])[0], | |
"_state": {"bus_idx": self.bus_idx}, | |
} | |
) | |
return True | |
# These sections are lists, so need to update indexes | |
if line.startswith(" Interface Descriptor:"): | |
self.section = "interface_descriptor" | |
self.interface_descriptor_idx += 1 | |
self.endpoint_descriptor_idx = -1 | |
self.videocontrol_interface_descriptor_idx = -1 | |
self.videostreaming_interface_descriptor_idx = -1 | |
self.attribute_value = False | |
return True | |
if line.startswith(" Endpoint Descriptor:"): | |
self.section = "endpoint_descriptor" | |
self.endpoint_descriptor_idx += 1 | |
self.attribute_value = False | |
return True | |
if line.startswith(" VideoControl Interface Descriptor:"): | |
self.section = "videocontrol_interface_descriptor" | |
self.videocontrol_interface_descriptor_idx += 1 | |
self.attribute_value = False | |
return True | |
if line.startswith(" VideoStreaming Interface Descriptor:"): | |
self.section = "videostreaming_interface_descriptor" | |
self.videostreaming_interface_descriptor_idx += 1 | |
self.attribute_value = False | |
return True | |
# some device status information is displayed on the initial line so | |
# need to extract immediately | |
if line.startswith("Device Status:"): | |
self.section = "device_status" | |
self.attribute_value = False | |
line_split = line.strip().split(":", maxsplit=1) | |
self.device_status_list.append( | |
{"value": line_split[1].strip(), "_state": {"bus_idx": self.bus_idx}} | |
) | |
return True | |
# set the rest of the sections | |
string_section_map = { | |
"Device Descriptor:": "device_descriptor", | |
" Configuration Descriptor:": "configuration_descriptor", | |
" Interface Association:": "interface_association", | |
" CDC Header:": "cdc_header", | |
" CDC Call Management:": "cdc_call_management", | |
" CDC ACM:": "cdc_acm", | |
" CDC Union:": "cdc_union", | |
" HID Device Descriptor:": "hid_device_descriptor", | |
" Report Descriptors:": "report_descriptors", | |
" CDC MBIM:": "cdc_mbim", | |
" CDC MBIM Extended:": "cdc_mbim_extended", | |
"Hub Descriptor:": "hub_descriptor", | |
" Hub Port Status:": "hub_port_status", | |
"Device Qualifier (for other device speed):": "device_qualifier", | |
"Binary Object Store Descriptor:": None, # not implemented | |
} | |
for sec_string, section_val in string_section_map.items(): | |
if line.startswith(sec_string): | |
self.section = section_val | |
self.attribute_value = False | |
return True | |
return False | |
def _populate_lists(self, line): | |
section_list_map = { | |
"device_descriptor": self.device_descriptor.list, | |
"configuration_descriptor": self.configuration_descriptor.list, | |
"interface_association": self.interface_association.list, | |
"interface_descriptor": self.interface_descriptor_list, | |
"cdc_header": self.cdc_header.list, | |
"cdc_call_management": self.cdc_call_management.list, | |
"cdc_acm": self.cdc_acm.list, | |
"cdc_union": self.cdc_union.list, | |
"cdc_mbim": self.cdc_mbim.list, | |
"cdc_mbim_extended": self.cdc_mbim_extended.list, | |
"hid_device_descriptor": self.hid_device_descriptor.list, | |
# 'report_descriptors': self.report_descriptors_list, # not implemented | |
"videocontrol_interface_descriptor": self.videocontrol_interface_descriptors.list, | |
"videostreaming_interface_descriptor": self.videostreaming_interface_descriptors.list, | |
"endpoint_descriptor": self.endpoint_descriptors.list, | |
"hub_descriptor": self.hub_descriptor.list, | |
"device_qualifier": self.device_qualifier_list, | |
} | |
for sec in section_list_map: | |
if line.startswith(" ") and self.section == sec: | |
section_list_map[self.section].append(self._add_attributes(line)) | |
return True | |
# special handling of these sections | |
if ( | |
line.startswith(" ") | |
and not line.startswith(" ") | |
and self.section == "hub_port_status" | |
): | |
self.hub_port_status_list.append(self._add_hub_port_status_attributes(line)) | |
return True | |
if line.startswith(" ") and self.section == "device_status": | |
self.device_status_list.append(self._add_device_status_attributes(line)) | |
return True | |
return False | |
def _populate_schema(self): | |
""" | |
Schema: | |
= {} | |
['device_descriptor'] = {} | |
['device_descriptor']['configuration_descriptor'] = {} | |
['device_descriptor']['configuration_descriptor']['interface_association'] = {} | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'] = [] | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0] = {} | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['videocontrol_interface_descriptors'] = [] | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['videocontrol_interface_descriptors'][0] = {} | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['videostreaming_interface_descriptors'] = [] | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['videostreaming_interface_descriptors'][0] = {} | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_header'] = {} | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_call_management'] = {} | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_acm'] = {} | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_union'] = {} | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_mbim'] = {} | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_mbim_extended'] = {} | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['hid_device_descriptor'] = {} | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['endpoint_descriptors'] = [] | |
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['endpoint_descriptors'][0] = {} | |
['hub_descriptor'] = {} | |
['hub_descriptor']['hub_port_status'] = {} | |
['device_qualifier'] = {} | |
['device_status'] = {} | |
""" | |
for idx, item in enumerate(self.bus_list): | |
if self.output_line: | |
self.raw_output.append(self.output_line) | |
self.output_line = _NestedDict() | |
del item["_state"] | |
self.output_line.update(item) | |
# add initial root-level keys | |
if self.device_descriptor._entries_for_this_bus_exist(idx): | |
self.device_descriptor._update_output(idx, self.output_line) | |
if self.configuration_descriptor._entries_for_this_bus_exist(idx): | |
self.configuration_descriptor._update_output( | |
idx, self.output_line["device_descriptor"] | |
) | |
if self.interface_association._entries_for_this_bus_exist(idx): | |
self.interface_association._update_output( | |
idx, | |
self.output_line["device_descriptor"]["configuration_descriptor"], | |
) | |
# add interface_descriptor key if it doesn't exist and there | |
# are entries for this bus | |
for iface_attrs in self.interface_descriptor_list: | |
keyname = tuple(iface_attrs.keys())[0] | |
if ( | |
"_state" in iface_attrs[keyname] | |
and iface_attrs[keyname]["_state"]["bus_idx"] == idx | |
): | |
self.output_line["device_descriptor"]["configuration_descriptor"][ | |
"interface_descriptors" | |
] = [] | |
# find max index for this bus idx, then iterate over that range | |
i_desc_iters = -1 | |
for iface_attrs in self.interface_descriptor_list: | |
keyname = tuple(iface_attrs.keys())[0] | |
if ( | |
"_state" in iface_attrs[keyname] | |
and iface_attrs[keyname]["_state"]["bus_idx"] == idx | |
): | |
i_desc_iters = iface_attrs[keyname]["_state"][ | |
"interface_descriptor_idx" | |
] | |
# create the interface descriptor object | |
if i_desc_iters > -1: | |
for iface_idx in range(i_desc_iters + 1): | |
i_desc_obj = _NestedDict() | |
for iface_attrs in self.interface_descriptor_list: | |
keyname = tuple(iface_attrs.keys())[0] | |
if ( | |
"_state" in iface_attrs[keyname] | |
and iface_attrs[keyname]["_state"]["bus_idx"] == idx | |
and iface_attrs[keyname]["_state"][ | |
"interface_descriptor_idx" | |
] | |
== iface_idx | |
): | |
# is this a top level value or an attribute? | |
if iface_attrs[keyname]["_state"]["attribute_value"]: | |
last_item = iface_attrs[keyname]["_state"]["last_item"] | |
if "attributes" not in i_desc_obj[last_item]: | |
i_desc_obj[last_item]["attributes"] = [] | |
this_attribute = f'{keyname} {iface_attrs[keyname].get("value", "")} {iface_attrs[keyname].get("description", "")}'.strip() | |
i_desc_obj[last_item]["attributes"].append( | |
this_attribute | |
) | |
continue | |
del iface_attrs[keyname]["_state"] | |
i_desc_obj.update(iface_attrs) | |
# add the rest of the interface descriptor keys to the object | |
if self.cdc_header._entries_for_this_bus_and_interface_idx_exist( | |
idx, iface_idx | |
): | |
self.cdc_header._update_output(idx, iface_idx, i_desc_obj) | |
if self.cdc_call_management._entries_for_this_bus_and_interface_idx_exist( | |
idx, iface_idx | |
): | |
self.cdc_call_management._update_output( | |
idx, iface_idx, i_desc_obj | |
) | |
if self.cdc_acm._entries_for_this_bus_and_interface_idx_exist( | |
idx, iface_idx | |
): | |
self.cdc_acm._update_output(idx, iface_idx, i_desc_obj) | |
if self.cdc_union._entries_for_this_bus_and_interface_idx_exist( | |
idx, iface_idx | |
): | |
self.cdc_union._update_output(idx, iface_idx, i_desc_obj) | |
if self.cdc_mbim._entries_for_this_bus_and_interface_idx_exist( | |
idx, iface_idx | |
): | |
self.cdc_mbim._update_output(idx, iface_idx, i_desc_obj) | |
if self.cdc_mbim_extended._entries_for_this_bus_and_interface_idx_exist( | |
idx, iface_idx | |
): | |
self.cdc_mbim_extended._update_output( | |
idx, iface_idx, i_desc_obj | |
) | |
if self.hid_device_descriptor._entries_for_this_bus_and_interface_idx_exist( | |
idx, iface_idx | |
): | |
self.hid_device_descriptor._update_output( | |
idx, iface_idx, i_desc_obj | |
) | |
# Not Implemented: Report Descriptors (need more samples) | |
# for rd in self.report_descriptors_list: | |
# keyname = tuple(rd.keys())[0] | |
# if '_state' in rd[keyname] and rd[keyname]['_state']['bus_idx'] == idx and rd[keyname]['_state']['interface_descriptor_idx'] == iface_idx: | |
# i_desc_obj['hid_device_descriptor']['report_descriptors'].update(rd) | |
# del i_desc_obj['hid_device_descriptor']['report_descriptors'][keyname]['_state'] | |
if self.videocontrol_interface_descriptors._entries_for_this_bus_and_interface_idx_exist( | |
idx, iface_idx | |
): | |
i_desc_obj["videocontrol_interface_descriptors"] = [] | |
i_desc_obj["videocontrol_interface_descriptors"].extend( | |
self.videocontrol_interface_descriptors._get_objects_list( | |
idx, iface_idx | |
) | |
) | |
if self.videostreaming_interface_descriptors._entries_for_this_bus_and_interface_idx_exist( | |
idx, iface_idx | |
): | |
i_desc_obj["videostreaming_interface_descriptors"] = [] | |
i_desc_obj["videostreaming_interface_descriptors"].extend( | |
self.videostreaming_interface_descriptors._get_objects_list( | |
idx, iface_idx | |
) | |
) | |
if self.endpoint_descriptors._entries_for_this_bus_and_interface_idx_exist( | |
idx, iface_idx | |
): | |
i_desc_obj["endpoint_descriptors"] = [] | |
i_desc_obj["endpoint_descriptors"].extend( | |
self.endpoint_descriptors._get_objects_list(idx, iface_idx) | |
) | |
# add the object to the list of interface descriptors | |
self.output_line["device_descriptor"]["configuration_descriptor"][ | |
"interface_descriptors" | |
].append(i_desc_obj) | |
# add final root-level keys | |
if self.hub_descriptor._entries_for_this_bus_exist(idx): | |
self.hub_descriptor._update_output(idx, self.output_line) | |
for hps in self.hub_port_status_list: | |
keyname = tuple(hps.keys())[0] | |
if ( | |
"_state" in hps[keyname] | |
and hps[keyname]["_state"]["bus_idx"] == idx | |
): | |
self.output_line["hub_descriptor"]["hub_port_status"].update(hps) | |
del self.output_line["hub_descriptor"]["hub_port_status"][keyname][ | |
"_state" | |
] | |
for dq in self.device_qualifier_list: | |
keyname = tuple(dq.keys())[0] | |
if "_state" in dq[keyname] and dq[keyname]["_state"]["bus_idx"] == idx: | |
self.output_line["device_qualifier"].update(dq) | |
del self.output_line["device_qualifier"][keyname]["_state"] | |
for ds in self.device_status_list: | |
if "_state" in ds and ds["_state"]["bus_idx"] == idx: | |
self.output_line["device_status"].update(ds) | |
del self.output_line["device_status"]["_state"] | |
def parse(data, raw=False, quiet=False): | |
""" | |
Main text parsing function | |
Parameters: | |
data: (string) text data to parse | |
raw: (boolean) unprocessed output if True | |
quiet: (boolean) suppress warning messages if True | |
Returns: | |
List of Dictionaries. Raw or processed structured data. | |
""" | |
compatibility(__name__, info.compatible, quiet) | |
input_type_check(data) | |
lsusb = _LsUsb() | |
if has_data(data): | |
# fix known too-long field names | |
data = data.replace("bmNetworkCapabilities", "bmNetworkCapabilit ") | |
for line in data.splitlines(): | |
# only -v option or no options are supported | |
if line.startswith("/"): | |
raise RuntimeError("Only `lsusb` or `lsusb -v` are supported.") | |
# sections | |
if lsusb._set_sections(line): | |
continue | |
# create section lists and schema | |
if lsusb._populate_lists(line): | |
continue | |
# populate the schema | |
lsusb._populate_schema() | |
# add any final output object if it exists and return the raw_output list | |
if lsusb.output_line: | |
lsusb.raw_output.append(lsusb.output_line) | |
return lsusb.raw_output if raw else _process(lsusb.raw_output) | |
if __name__ == "__main__": | |
import sys | |
import json | |
data = sys.stdin.read() | |
json.dump(parse(data), sys.stdout) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment