Created
March 21, 2018 07:58
-
-
Save badocelot/a2f4a50ef8498230911194634fcae0c1 to your computer and use it in GitHub Desktop.
Streams with a Python twist
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools, itertools | |
class StreamIterator: | |
""" | |
Iterator class for a Stream | |
This class manages the cache for a regular Stream; it is unnecessary for a | |
ContainerStream. | |
""" | |
def __init__(self, source, cache): | |
self.__source = source | |
self.__cache = cache | |
self.__index = 0 | |
def __iter__(self): | |
return self | |
def __next__(self): | |
if self.__index < len(self.__cache): | |
# if there's more left in cache, use it | |
result = self.__cache[self.__index] | |
else: | |
# otherwise, take a new item off source and cache it | |
result = next(self.__source) | |
self.__cache.append(result) | |
# if not at the end, move up; if at the end, keep it there | |
self.__index += 1 | |
return result | |
class Stream: | |
"""Stream wrapping around an iterable, providing convenient methods.""" | |
def __init__(self, iterable): | |
"""Creates a Stream from the iterable""" | |
self.__source = iter(iterable) | |
self.__cache = [] | |
def __add__(self, iterable): | |
""" | |
Returns a Stream that represents the concatenation of this Stream | |
with another iterable, with the items of this Stream first | |
""" | |
return Stream(itertools.chain(self, iterable)) | |
def __iter__(self): | |
"""Returns a StreamIterator for this Stream""" | |
return StreamIterator(self.__source, self.__cache) | |
def __mul__(self, n): | |
""" | |
Returns a Stream that repeats the contents of this Stream `n` times | |
WARNING | |
------- | |
Do not use this operator on infinite Streams! | |
""" | |
return self.repeat(n) | |
def __radd__(self, iterable): | |
""" | |
Returns a Stream that represents the concatenation of this Stream | |
with another iterable, with the items of the iterable first | |
""" | |
return Stream(itertools.chain(iterable, self)) | |
def __rmul__(self, n): | |
""" | |
Returns a Stream that repeats the contents of this Stream `n` times | |
WARNING | |
------- | |
Do not use this operator on infinite Streams! | |
""" | |
return self.repeat(n) | |
def __len__(self): | |
""" | |
Returns the number of items in this Stream | |
WARNING | |
------- | |
Do not use len() on infinite Streams! | |
""" | |
return sum(1 for _ in self) | |
def all(self, pred = True): | |
""" | |
Returns whether all items of this Stream match the given predicate | |
If no predicate is provided, returns whether all items in this Stream | |
are truthy, as if `bool` were the predicate | |
WARNING | |
------- | |
Do no use this method on infinite Streams! | |
""" | |
if pred is None: | |
return all(self) | |
else: | |
return all(pred(x) for x in self) | |
def any(self, pred = None): | |
""" | |
Returns whether this Stream has any items matching the given predicate | |
If no predicate is provided, returns whether this Stream has any items | |
at all | |
""" | |
if pred is None: | |
return self.at_least(1) | |
else: | |
return any(pred(x) for x in self) | |
def at_least(self, how_many): | |
""" | |
Returns whether this Stream has at least `how_many` items | |
""" | |
try: | |
iterator = iter(self) | |
for i in range(how_many): | |
next(iterator) | |
return True | |
except StopIteration: | |
return False | |
def chain(self, iterable): | |
""" | |
Returns a Stream that represents the concatenation of this Stream | |
with another iterable, with the items of this Stream first | |
Equivalent to self + iterable | |
""" | |
return self + iterable | |
def count(self, pred = None): | |
""" | |
Returns the number of items matching a specific predicate | |
Returns the overall number of items if no predicate is given | |
Example Usage | |
------------- | |
>>> st = stream([1,2,3,4,5]) | |
>>> st.count(lambda x: x % 2 == 0) # => 2 | |
>>> st.count() # => 5 | |
""" | |
if pred is None: | |
return len(self) | |
else: | |
return sum(1 for x in self if pred(x)) | |
def drop(self, how_many): | |
""" | |
Returns a Stream with the items of this Stream, except the first | |
`how_many` | |
""" | |
def inner(iterable, how_many: int): | |
for i in range(how_many): | |
next(iterable) | |
for item in iterable: | |
yield item | |
return Stream(inner(iter(self), how_many)) | |
def empty(self): | |
""" | |
Returns whether this Stream has no items | |
Equivalent to `not self.any()` | |
""" | |
return not self.any() | |
def filter(self, pred): | |
""" | |
Returns a Stream that containing only the items in this Stream that | |
meet the given predicate (i.e. cause it to return True) | |
Example Usage | |
------------- | |
>>> st = stream([1,2,3,4,5]) | |
>>> st.filter(lambda x: x % 2 == 0) # => Stream with 2, 4 | |
""" | |
return Stream(filter(pred, self)) | |
def first(self, pred = None): | |
""" | |
Returns the first item in this Stream matching the given predicate | |
If no predicate is provided, returns the first item in this Stream | |
""" | |
if pred is None: | |
return next(iter(self)) | |
else: | |
return next(x for x in self if pred(x)) | |
def foldr(self, func, initial): | |
""" | |
Returns the result of performing the fold-right operation on this | |
Stream with the given function and initial value. | |
WARNING | |
------- | |
Do not use this method on infinite Streams! | |
""" | |
return functools.reduce(lambda x,y: func(y,x), reversed(self), initial) | |
def last(self, pred = None): | |
""" | |
Returns the last item in this Stream that matches the given predicate | |
If no predicate is provided, returns the last item in this Stream | |
WARNING | |
------- | |
Do not use this method on infinite Streams! | |
""" | |
return self.reversed().first(pred) | |
def list(self): | |
""" | |
Returns the items of this Stream in a list | |
Equivalent to `self.to(list)` | |
WARNING | |
------- | |
Do not use this method on infinite Streams! | |
""" | |
return list(self) | |
def map(self, func): | |
""" | |
Returns a Stream containing the results of calling func() with each | |
item in this stream, similar to map() | |
Example Usage | |
------------- | |
>>> st = stream([1,2,3,4,5]) | |
>>> st.map(lambda x: x * x) # => Stream with 1, 4, 9, 16, 25 | |
>>> st.map(float) # => Stream with 1.0, 2.0, 3.0, 4.0, 5.0 | |
""" | |
return Stream(map(func, self)) | |
def reduce(self, func, initial): | |
""" | |
Returns the result of the reduce operation using the given function and | |
initial value | |
""" | |
return functools.reduce(func, self, initial) | |
def repeat(self, n = 2): | |
""" | |
Returns a Stream that repeats the contents of this Stream `n` times | |
WARNING | |
------- | |
Do not use this method on infinite Streams! | |
""" | |
return Stream(itertools.chain.from_iterable(self.tee(n))) | |
def reversed(self): | |
""" | |
Returns a reversed version of this Stream. | |
WARNING | |
------- | |
Do not use this method on infinite Streams! | |
""" | |
return Stream(reversed(self)) | |
def set(self): | |
""" | |
Returns the items of this Stream in a set | |
Equivalent to `self.to(set)` | |
WARNING | |
------- | |
Do not use this method on infinite Streams! | |
""" | |
return set(self) | |
def sort(self, key = None): | |
""" | |
Returns a sorted version of this Stream, using `key` as a key function | |
WARNING | |
------- | |
Do not use this method on infinite Streams! | |
""" | |
return Stream(sorted(self, key=key)) | |
def take(self, num): | |
"""Returns a Stream containing the first `num` items of this Stream""" | |
iterator = iter(self) | |
return Stream(next(iterator) for _ in range(num)) | |
def tee(self, n = 2): | |
""" | |
Returns a tuple containing `n` duplicates of this Stream | |
WARNING | |
------- | |
Do not use this on infinite Streams! | |
""" | |
return tuple(Stream(t) for t in itertools.tee(self, n)) | |
def to(self, type_): | |
""" | |
Returns the Stream converted to `type_` | |
`type_` must be a type constructor that takes an iterable | |
Example Usage | |
------------- | |
>>> st = stream([1,2,3,4,5]) | |
>>> st.to(list) # => [1, 2, 3, 4, 5] | |
>>> st.to(tuple) # => (1, 2, 3, 4, 5) | |
>>> st.to(set) # => {1, 2, 3, 4, 5} | |
>>> st.map(float).to(list) # => [1.0, 2.0, 3.0, 4.0, 5.0] | |
WARNING | |
------- | |
Do not use this on infinite Streams, unless the type constructor knows | |
how to deal with them. | |
""" | |
return type_(self) | |
def tuple(self): | |
""" | |
Returns the items of this Stream in a tuple | |
Equivalent to `self.to(tuple)` | |
WARNING | |
------- | |
Do not use this method on infinite Streams! | |
""" | |
return tuple(self) | |
class ContainerStream(Stream): | |
""" | |
Stream wrapping around a finite iterable container | |
Does no caching, unlike regular Stream; assumes that calling iter() on the | |
provided container will always return a fresh iterator with all items in | |
the container | |
""" | |
def __init__(self, container): | |
"""Creates a Stream wrapping around the container""" | |
self.__container = container | |
def __iter__(self): | |
"""Returns the iterator for this Stream""" | |
return iter(self.__container) | |
def stream(iterable, use_container = None): | |
""" | |
Returns a Stream for the iterable | |
Will use ContainerStream if `use_container` is True | |
If `use_container` is not provided, built-in container types will be | |
automatically upgraded to ContainerStream; to prevent this, pass | |
`use_container` as False | |
""" | |
builtins = (dict, list, set, tuple) | |
return_container = use_container or \ | |
use_container is None and type(iterable) in builtins | |
return ContainerStream(iterable) if return_container else Stream(iterable) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment