Last active
December 17, 2018 12:42
-
-
Save seblin/be07bb481258b115192180c05dcc4100 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Copyright (c) 2013-2018 Sebastian Linke | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
__author__ = 'Sebastian Linke' | |
__version__ = '0.4-dev' | |
__license__ = 'MIT' | |
import fnmatch | |
import locale | |
import os | |
import re | |
from collections.abc import Mapping | |
from functools import partial, singledispatch | |
from io import IOBase | |
from itertools import chain, zip_longest | |
from shutil import get_terminal_size | |
from sys import version_info | |
if version_info[:2] < (3, 6): | |
raise RuntimeError('Need Python version >= 3.6') | |
DEFAULT_CONFIG = { | |
'spacing': 2, | |
'spacer': None, | |
'width': None, | |
'column_widths': None, | |
'num_columns': None, | |
'vertical': True, | |
'align': 'left', | |
'wrap': False, | |
'min_shrink_width': 5, | |
'sort': False, | |
'locale': None, | |
'pattern': None, | |
'unique': False, | |
'index': None, | |
'output_stream': None} | |
class ConfigHelper(dict): | |
spec_templates = { | |
'left': {'default': '{{:<{0}.{0}}}', 'end': '{{:<.{0}}}'}, | |
'center': {'default': '{{:^{0}.{0}}}', 'end': '{{:^.{0}}}'}, | |
'right': {'default': '{{:>{0}.{0}}}', 'end': '{{:>.{0}}}'}} | |
def __init__(self, config=None, **config_args): | |
dict.__init__(self, config or DEFAULT_CONFIG) | |
if config_args: | |
self._check_keywords(config_args) | |
self.update(config_args) | |
def _check_keywords(self, keywords): | |
for kw in keywords: | |
if kw not in DEFAULT_CONFIG: | |
raise TypeError(f'Unexpected keyword: {kw!r}') | |
def copy(self): | |
return type(self)(dict.copy(self)) | |
def get_spec(self, width, style='default'): | |
align = self['align'] | |
if align not in self.spec_templates: | |
raise ValueError(f'Unexpected alignment: {align!r}') | |
return self.spec_templates[align][style].format(width) | |
def get_column_widths(self, items): | |
widths = self['column_widths'] | |
if not widths: | |
return list(map(len, items)) | |
return widths[:len(items)] | |
def get_max_width(self): | |
return self['width'] or get_terminal_size().columns | |
def get_spacer(self): | |
return self['spacer'] or self['spacing'] * ' ' | |
class StringSeries: | |
def __init__(self, items, config=None): | |
if not items: | |
raise ValueError('Need at least one item') | |
self.items = items | |
self.config = ConfigHelper(config) | |
def __iter__(self): | |
return iter(self.items) | |
def __str__(self): | |
try: | |
return self.get_formatted() | |
except NotImplementedError: | |
return str(self.items) | |
def _raise_not_implemented(self, text): | |
name = type(self).__name__ | |
raise NotImplementedError( | |
f'{type(self).__name__} does not implement {text}') | |
def get_format_string(self): | |
self._raise_not_implemented('get_format_string()') | |
def get_formatted(self): | |
return self.get_format_string().format(*self.items) | |
def get_width(self): | |
self._raise_not_implemented('get_width()') | |
@classmethod | |
def convert_from(cls, objects, config=None): | |
items = list(map(str, objects)) | |
return cls(items, config) | |
class Column(StringSeries): | |
def get_format_string(self): | |
spec = self.config.get_spec(self.get_width(), 'end') | |
return '\n'.join(spec for _ in self.items) | |
def get_width(self): | |
return max(map(len, self.items)) | |
class Row(StringSeries): | |
def get_format_string(self): | |
widths = self.config.get_column_widths(self.items) | |
specs = list(map(self.config.get_spec, widths[:-1])) | |
specs.append(self.config.get_spec(widths[-1], 'end')) | |
return self.config.get_spacer().join(specs) | |
def get_width(self): | |
widths = self.config.get_column_widths(self.items) | |
last_width = min(len(self.items[-1]), widths[-1]) | |
total_spacing = (len(widths) - 1) * len(self.config.get_spacer()) | |
return sum(widths[:-1]) + last_width + total_spacing | |
class ItemWrapper: | |
def __init__(self, min_width): | |
self.min_width = min_width | |
def _shrink(self, width, offset): | |
if width <= self.min_width: | |
return (width, offset) | |
shrinked_width = max(width - offset, self.min_width) | |
shrinked_offset = offset - (width - shrinked_width) | |
return (shrinked_width, shrinked_offset) | |
def get_shrinked_widths(self, widths, offset): | |
shrinked_widths = [] | |
while offset > 0 and widths: | |
width, offset = self._shrink(widths.pop(), offset) | |
shrinked_widths.append(width) | |
return widths + shrinked_widths[::-1] | |
def wrap_items(self, items, widths): | |
while any(items): | |
yield [item[:width] for item, width in zip(items, widths)] | |
items = [item[width:] for item, width in zip(items, widths)] | |
class StringColumnizer(StringSeries): | |
def _wrap(self, rows): | |
wrapper = ItemWrapper(self.config['min_shrink_width']) | |
widths = rows[0].config['column_widths'] | |
offset = max(r.get_width() for r in rows) - self.config.get_max_width() | |
shrinked_widths = wrapper.get_shrinked_widths(widths, offset) | |
config = ConfigHelper(self.config, column_widths=shrinked_widths) | |
wrap = partial(wrapper.wrap_items, widths=shrinked_widths) | |
return [Row(items, config) for row in rows | |
for items in wrap(row.items)] | |
def _make_columns(self, ncols, strict=True): | |
col_size, remaining = divmod(len(self.items), ncols) | |
if remaining: | |
col_size += 1 | |
col_items = [ | |
self.items[i * col_size : (i + 1) * col_size] for i in range(ncols) | |
] if self.config['vertical'] else [ | |
self.items[i::ncols] for i in range(ncols)] | |
if not col_items[-1]: | |
if strict: | |
raise ValueError(f'Items do not fit in {ncols} columns') | |
return self._make_columns(ncols - 1, strict=False) | |
config = self.config.copy() | |
return [Column(items, config) for items in col_items] | |
def _make_rows(self, ncols, strict=True): | |
columns = self._make_columns(ncols, strict) | |
config = self.config.copy() | |
if not config['column_widths']: | |
config['column_widths'] = [col.get_width() for col in columns] | |
rows = [Row(list(filter(None, row_items)), config) | |
for row_items in zip_longest(*columns)] | |
return self._wrap(rows) if self.config['wrap'] else rows | |
def _guess_rows(self): | |
max_width = self.config.get_max_width() | |
ncols = min(len(self.items), max_width) | |
while ncols > 0: | |
rows = self._make_rows(ncols, strict=False) | |
if all(row.get_width() <= max_width for row in rows): | |
return rows | |
ncols = len(rows[0].config['column_widths']) - 1 | |
def make_rows(self): | |
num_columns = self.config['num_columns'] | |
column_widths = self.config['column_widths'] | |
if num_columns and column_widths: | |
raise ValueError('num_columns and column_widths ' | |
'must not defined concurrently') | |
ncols = num_columns or len(column_widths or []) | |
return self._make_rows(ncols) if ncols else self._guess_rows() | |
def get_lines(self, rows): | |
formatter = rows[0].get_format_string().format | |
for row in rows: | |
try: | |
yield formatter(*row.items) | |
except IndexError: | |
# Number of items changed, update the formatter | |
formatter = row.get_format_string().format | |
yield formatter(*row.items) | |
def _join_tty_lines(self, lines): | |
line_width = self.config.get_max_width() | |
return ''.join( | |
line if len(line) == line_width else line + '\n' for line in lines) | |
def get_formatted(self): | |
lines = self.get_lines(self.make_rows()) | |
stream = self.config['output_stream'] | |
if stream and not isatty(stream): | |
return '\n'.join(lines) | |
return self._join_tty_lines(lines) | |
@classmethod | |
def from_mapping(cls, mapping, config=None): | |
items = chain(mapping.keys(), mapping.values()) | |
config['num_columns'] = 2 | |
config['wrap'] = True | |
return cls.convert_from(items, config) | |
def isatty(stream): | |
if not hasattr(stream, 'fileno'): | |
return False | |
return os.isatty(stream.fileno()) | |
@singledispatch | |
def use_index(source, index): | |
try: | |
return (line.split()[index] for line in source) | |
except IndexError: | |
raise IndexError('Column index out of range') from None | |
@use_index.register(Mapping) | |
def use_index_mapping(source, index): | |
raise NotImplementedError('Using index not supported for mappings') | |
@singledispatch | |
def make_unique(items): | |
seen = set() | |
for item in items: | |
if item not in seen: | |
seen.add(item) | |
yield item | |
@make_unique.register(Mapping) | |
def make_unique_mapping(items): | |
raise NotImplementedError('Mapping items are already unique') | |
@singledispatch | |
def match(items, pattern): | |
pattern = re.compile(fnmatch.translate(pattern), re.IGNORECASE) | |
return (item for item in items if pattern.match(item)) | |
@match.register(Mapping) | |
def match_mapping(items, pattern): | |
return {key: items[key] for key in match(items.keys(), pattern)} | |
@singledispatch | |
def sort(items, loc=None): | |
old_locale = locale.getlocale(locale.LC_COLLATE) | |
locale.setlocale(locale.LC_COLLATE, loc or '') | |
try: | |
return sorted(items, key=locale.strxfrm) | |
except TypeError: | |
# Fallback for non-string items | |
return sorted(items) | |
finally: | |
locale.setlocale(locale.LC_COLLATE, old_locale) | |
@sort.register(Mapping) | |
def sort_mapping(items, loc=None): | |
# Python >= 3.6 keeps insertion order | |
return {key: items[key] for key in sort(items.keys(), loc)} | |
def apply_config(items, config): | |
if config['index'] is not None: | |
items = use_index(items, config['index']) | |
if config['unique']: | |
items = make_unique(items) | |
if config['pattern']: | |
items = match(items, config['pattern']) | |
if config['sort']: | |
items = sort(items, config['locale']) | |
return items | |
@singledispatch | |
def get_columnizer(items, config=None): | |
return StringColumnizer.convert_from(items, config) | |
@get_columnizer.register(Mapping) | |
def get_columnizer_mapping(items, config=None): | |
return StringColumnizer.from_mapping(items, config) | |
@singledispatch | |
def print_columnized(items, **config_args): | |
config = ConfigHelper(**config_args) | |
items = apply_config(items, config) | |
columnizer = get_columnizer(items, config) | |
print(columnizer, end='', file=config['output_stream']) | |
@print_columnized.register(IOBase) | |
def print_columnized_stream(source, **config_args): | |
items = (line.rstrip() for line in source) | |
return print_columnized(items, **config_args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment