Created
February 1, 2018 14:30
-
-
Save iAnanich/53a99ebc56460882d7872bb225cb093d to your computer and use it in GitHub Desktop.
IterManager - helps with complex iteration cases - counters, excludes, context, type checks etc.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import typing | |
TypeOrNone = typing.Union[type, None] | |
def has_wrong_type(obj, expected_obj_type: TypeOrNone) -> bool: | |
""" | |
Checks if given `obj` object has not given `expected_obj_type` type. If | |
`expected_obj_type` is `None` than it will return `True` | |
:param obj: any object | |
:param expected_obj_type: expected type of the object or `None` | |
:return: `True` if `obj` object is not of `expected_obj_type` type, `False` | |
if `expected_obj_type` is `None` or `obj` object has `expected_obj_type` type | |
""" | |
# if `expected` type is `None` it will | |
# return False without `isinstance` call | |
return expected_obj_type is not None and not isinstance(obj, expected_obj_type) | |
def raise_type_error(obj_repr: str, obj_type: type, expected_obj_type: type, | |
obj_name: str ='This'): | |
raise TypeError( | |
f'{obj_name} {obj_repr} has "{obj_type}" type while ' | |
f'"{expected_obj_type}" is expected.' | |
) | |
def check_obj_type(obj, expected_obj_type: TypeOrNone, obj_name: str ='object'): | |
if has_wrong_type(obj=obj, expected_obj_type=expected_obj_type): | |
raise_type_error( | |
obj_name=obj_name, | |
obj_repr=repr(obj), | |
obj_type=type(obj), | |
expected_obj_type=expected_obj_type, | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import typing | |
ThresholdBasis = typing.Union[int, None] | |
class Threshold: | |
def __init__(self, val: ThresholdBasis=None): | |
if val is None or val is 0: | |
self._value = 0 # as equivalent to None | |
elif isinstance(val, int): | |
if val <= 0: | |
raise TypeError( | |
f'first argument must not be less then zero, got "{val}".') | |
else: | |
self._value = val | |
else: | |
raise TypeError( | |
f'first argument must be of type `int` or `NoneType`, ' | |
f'got "{val}" of "{type(val)}" type.') | |
# public API | |
@property | |
def value(self) -> ThresholdBasis: | |
if not self: # if self._value == 0 | |
return None | |
return self._value | |
def __int__(self): | |
if not self: | |
raise TypeError( | |
f'this threshold equals to None and ' | |
f'can not be used as an integer.') | |
return self._value | |
def __bool__(self): | |
return self._value != 0 | |
# string representations | |
def __repr__(self): | |
return f'<Threshold {self.value}>' | |
def __str__(self): | |
return str(self.value) | |
class Counter: | |
def __init__(self): | |
self._is_enabled = True | |
self._count = 0 | |
def add(self) -> bool: | |
if self._is_enabled: | |
self._count += 1 | |
return bool(self) | |
else: | |
return False | |
def drop(self): | |
if self._is_enabled: | |
self._count = 0 | |
@property | |
def count(self) -> int: | |
return self._count | |
@property | |
def is_enabled(self) -> bool: | |
return self._is_enabled | |
def enable(self): | |
self._is_enabled = True | |
def disable(self): | |
self._is_enabled = False | |
# string representations | |
def __repr__(self): | |
return f'<Counter count={self._count}>' | |
def __str__(self): | |
return str(self._count) | |
class CounterWithThreshold(Counter): | |
def __init__(self, threshold: Threshold = None): | |
super().__init__() | |
if not threshold: | |
self._is_enabled = False | |
self._count = -1 | |
self._threshold = threshold | |
@property | |
def threshold(self) -> Threshold: | |
return self._threshold | |
def __bool__(self): | |
if self._is_enabled: | |
return self._count >= int(self._threshold) | |
return True | |
def __repr__(self): | |
return f'<Counter {"REACHED" if bool(self) else "count=%s" % self._count}>' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
import typing | |
from .check import has_wrong_type, raise_type_error, check_obj_type | |
class BaseFunc(abc.ABC): | |
def __init__(self, func, args: tuple = None, kwargs: dict = None): | |
if kwargs is None: | |
kwargs = dict() | |
if args is None: | |
args = tuple() | |
if not isinstance(args, tuple): | |
raise TypeError('Given `args` are not `tuple` object.') | |
if not isinstance(kwargs, dict): | |
raise TypeError('Given `kwargs` are not `dict` object.') | |
if not callable(func): | |
raise TypeError('Given `func` argument must be callable.') | |
self.function = func | |
self.args = args | |
self.kwargs = kwargs | |
@abc.abstractmethod | |
def call(self, input_value): | |
pass | |
class Func(BaseFunc): | |
def call(self, input_value): | |
return self.function(input_value, *self.args, **self.kwargs) | |
class StronglyTypedFunc(BaseFunc): | |
# None value will cause type check to pass any type | |
output_type = None | |
input_type = None | |
def __init__(self, func, args: tuple =None, kwargs: dict =None, | |
input_type: type =None, output_type: type =None): | |
super().__init__( | |
func=func, | |
args=args, | |
kwargs=kwargs, | |
) | |
# override class attributes | |
if input_type is not None: | |
self.input_type = input_type | |
if output_type is not None: | |
self.output_type = output_type | |
def call(self, input_value): | |
self._check_input(input_value) | |
output_value = self.function(input_value, *self.args, **self.kwargs) | |
self._check_output(output_value) | |
return output_value | |
def _check_input(self, value): | |
self._check_type(value, self.input_type, 'input') | |
def _check_output(self, value): | |
self._check_type(value, self.output_type, 'output') | |
def _check_type(self, value, expected: type or None, action: str): | |
if has_wrong_type(value, expected): | |
raise_type_error( | |
obj_repr=repr(value), | |
obj_type=type(value), | |
obj_name=f'{action.capitalize()} value', | |
expected_obj_type=expected, | |
) | |
class FuncSequence: | |
func_type = BaseFunc | |
def __init__(self, *funcs: func_type): | |
for i, func in enumerate(funcs): | |
check_obj_type(func, self.func_type, f'Callable #{i}') | |
self._list: typing.List[self.func_type] = list(funcs) | |
def process(self, value): | |
for middleware in self._list: | |
value = middleware.call(value) | |
else: | |
return value | |
# some list methods | |
def copy(self): | |
return self.__class__(*self._list) | |
def clear(self): | |
try: | |
while True: | |
self._list.pop() | |
except IndexError: | |
pass | |
def reverse(self): | |
sequence = self._list | |
n = len(sequence) | |
for i in range(n//2): | |
sequence[i], sequence[n - i - 1] = sequence[n - i - 1], sequence[i] | |
def pop(self, index: int =-1): | |
v = self._list[index] | |
del self._list[index] | |
return v | |
def append(self, func: func_type): | |
check_obj_type(func, self.func_type, f'Callable') | |
self._list.append(func) | |
def remove(self, value: func_type): | |
del self._list[self._list.index(value)] | |
def extend(self, funcs: typing.Sequence[func_type]): | |
for i, func in enumerate(funcs): | |
check_obj_type(func, self.func_type, f'Callable #{i}') | |
self._list.append(func) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import types | |
from typing import Iterator, Callable, Sequence | |
from .counter import Threshold, CounterWithThreshold | |
from .func import StronglyTypedFunc | |
from .check import check_obj_type | |
class ExcludeCheck: | |
def __init__(self, iterator: Iterator, default=None): | |
check_obj_type(iterator, collections.Iterator, 'Iterator') | |
self._iterator = iterator | |
self._default = default | |
self._is_completed = False | |
self._yield_next() | |
def _yield_next(self): | |
try: | |
value = next(self._iterator) | |
except StopIteration: | |
value = self._default | |
self._is_completed = True | |
self._value = value | |
return value | |
def check_next(self, value): | |
if self._is_completed: | |
return False | |
if value == self._value: | |
self._yield_next() | |
return True | |
return False | |
@property | |
def value(self): | |
return self._value | |
class BaseContext: | |
CLOSE_REASON = 'close_reason' | |
VALUE = 'value' | |
EXCLUDE_VALUE = 'exclude_value' | |
_lock_keys = frozenset((CLOSE_REASON, VALUE, EXCLUDE_VALUE)) | |
_value_type: type = object | |
_exclude_value_type: type = object | |
def __init__(self, value, exclude_value): | |
check_obj_type(value, self._value_type, 'Value') | |
check_obj_type(exclude_value, self._exclude_value_type, 'Exclude value') | |
self._dict = { | |
self.VALUE: value, | |
self.EXCLUDE_VALUE: exclude_value, | |
} | |
def set_close_reason(self, message: str): | |
check_obj_type(message, str, 'Message') | |
if self.close_reason: | |
self._dict[self.CLOSE_REASON].append(message) | |
else: | |
self._dict[self.CLOSE_REASON] = [message, ] | |
@property | |
def value(self): | |
return self._dict[self.VALUE] | |
@property | |
def exclude_value(self): | |
return self._dict[self.EXCLUDE_VALUE] | |
@property | |
def close_reason(self) -> str: | |
""" | |
Returns last set close reason message. | |
:return: string | |
""" | |
close_reasons = self._dict.get(self.CLOSE_REASON, None) | |
if close_reasons: | |
return close_reasons[-1] | |
def dict_proxy(self): | |
return types.MappingProxyType(self._dict) | |
def update(self, dictionary: dict): | |
for key, val in dictionary.items(): | |
self[key] = val | |
def __getitem__(self, item: str): | |
return self._dict[item] | |
def __setitem__(self, key: str, value): | |
if key not in self._lock_keys: | |
self._dict[key] = value | |
else: | |
raise KeyError(f'{key} key can not be assigned in this way.') | |
@classmethod | |
def new(cls, value_type: type, exclude_value_type: type, | |
name: str='Context') -> type: | |
attributes = { | |
'_value_type': value_type, | |
'_exclude_value_type': exclude_value_type | |
} | |
return type(name, (cls, ), attributes) | |
class IterManager: | |
_base_context_type = BaseContext | |
_context_processor_output_type = bool | |
def __init__(self, general_iterator: Iterator, | |
value_type: type =None, return_type: type =None, | |
exclude_value_type: type =None, | |
exclude_iterator: Iterator =None, exclude_default=None, | |
max_iterations: int or None =None, | |
max_exclude_strike: int or None =None, | |
max_total_excluded: int or None =None, | |
max_returned_values: int or None =None, | |
case_processors: Sequence[Callable] =None, | |
context_processor: Callable =None, | |
return_value_processor: Callable =None, | |
before_finish: Callable =None): | |
# `*_type` attributes can be even None because will be only used | |
# by `func.StronglyTypedFunc` that uses `check.check_obj_type` | |
self._value_type = value_type | |
self._return_type = return_type | |
self._exclude_type = exclude_value_type | |
check_obj_type(general_iterator, collections.Iterator, 'General iterator') | |
self._general_iterator = general_iterator | |
check_obj_type(exclude_default, exclude_value_type, 'Exclude default value') | |
self._exclude_default = exclude_default | |
if exclude_iterator is None: | |
exclude_iterator = iter([]) # empty iterator | |
self._exclude_checker = ExcludeCheck( | |
iterator=exclude_iterator, | |
default=self._exclude_default) | |
self._exclude_iterator = exclude_iterator | |
self._total_iterations_threshold = Threshold(max_iterations) | |
self._total_iterations_counter = CounterWithThreshold( | |
threshold=self._total_iterations_threshold) | |
self._exclude_strike_threshold = Threshold(max_exclude_strike) | |
self._exclude_strike_counter = CounterWithThreshold( | |
threshold=self._exclude_strike_threshold) | |
self._total_excluded_threshold = Threshold(max_total_excluded) | |
self._total_excluded_counter = CounterWithThreshold( | |
threshold=self._total_excluded_threshold) | |
self._total_returned_threshold = Threshold(max_returned_values) | |
self._total_returned_counter = CounterWithThreshold( | |
threshold=self._total_returned_threshold) | |
self._context_type = self._base_context_type.new(value_type, exclude_value_type) | |
if context_processor is None: | |
context_processor = lambda value: BaseContext(value=value, exclude_value=value) | |
self._context_processor = StronglyTypedFunc( | |
func=context_processor, | |
kwargs={'context_type': self._context_type}, | |
input_type=self._value_type, | |
output_type=self._context_type, ) | |
if before_finish is None: | |
before_finish = lambda ctx: None | |
self._before_finish = StronglyTypedFunc( | |
func=before_finish, | |
input_type=self._context_type, | |
output_type=None, ) | |
if return_value_processor is None: | |
return_value_processor = lambda ctx: ctx.value | |
self._return_value_processor = StronglyTypedFunc( | |
func=return_value_processor, | |
input_type=self._context_type, | |
output_type=self._return_type,) | |
if case_processors is None: | |
case_processors = [] | |
self._case_processors = [ | |
StronglyTypedFunc( | |
func=processor, | |
input_type=self._context_type, | |
output_type=self._context_processor_output_type, ) | |
for processor in case_processors] | |
def _chain_case_processors(self, context: BaseContext) -> bool: | |
""" | |
:param context: BaseContext object | |
:return: True if any case processor have returned True, else False | |
""" | |
for processor in self._case_processors: | |
if processor.call(context): | |
return True | |
else: | |
return False | |
def _check_exclude(self, context: BaseContext) -> bool: | |
""" | |
:param context: BaseContext object | |
:return: True if value must be returned, else False | |
""" | |
if self._exclude_checker.check_next(context.exclude_value): | |
if self._exclude_strike_counter.add(): | |
context.set_close_reason('Exclude matches threshold reached.') | |
if self._total_excluded_counter.add(): | |
context.set_close_reason('Total excluded threshold reached.') | |
return False | |
else: | |
self._exclude_strike_counter.drop() | |
return True | |
def _return(self, context: BaseContext) -> object: | |
""" | |
Increases `total_returned_counter`, and calls `return_value_processor` | |
:param context: BaseContext object | |
:return: returns processed value | |
""" | |
if self._total_returned_counter.add(): | |
context.set_close_reason('Returned values threshold reached.') | |
return self._return_value_processor.call(context) | |
def __iter__(self): | |
for value in self._general_iterator: | |
context: BaseContext = self._context_processor.call(value) | |
if self._chain_case_processors(context): | |
continue | |
if self._check_exclude(context): | |
yield self._return(context) | |
if self._total_iterations_counter.add(): | |
context.set_close_reason('Iterations count threshold reached.') | |
if context.close_reason: | |
self._before_finish.call(context) | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment