Skip to content

Instantly share code, notes, and snippets.

@krackers
Forked from hzhu212/python_stream.md
Created August 25, 2023 09:22
Show Gist options
  • Save krackers/011fc68ed1d665099a397f9d8e658f47 to your computer and use it in GitHub Desktop.
Save krackers/011fc68ed1d665099a397f9d8e658f47 to your computer and use it in GitHub Desktop.
Python iterator with chain-call like Java stream

Python Stream object act like Java Stream, features include lazy calculating, slicing, chain-call etc.

stream.py:

import functools
import itertools


class Stream(object):
    """
    a stream object for chain-call
    """
    def __init__(self, iterable):
        self._iterable = iterable

    def __iter__(self):
        return self

    def __next__(self):
        return next(self._iterable)

    def skip(self, n):
        self._iterable = itertools.islice(self._iterable, n, None, 1)
        return self

    def limit(self, n):
        self._iterable = itertools.islice(self._iterable, None, n, 1)
        return self

    def step(self, n):
        self._iterable = itertools.islice(self._iterable, None, None, n)
        return self

    def __getitem__(self, slc):
        if not isinstance(slc, slice):
            raise ValueError(f'"[]" operator supports only slice object, but {slc!r} got')
        self._iterable = itertools.islice(self._iterable, slc.start, slc.stop, slc.step)
        return self

    def apply(self, fun, *args, **kwargs):
        self._iterable = (fun(x, *args, **kwargs) for x in self._iterable)
        return self
    
    def map(self, fun):
        self._iterable = (fun(x) for x in self._iterable)
        return self
    
    def filter(self, fun=None):
        if fun is None:
            fun = lambda x: x
        self._iterable = (x for x in self._iterable if fun(x))
        return self
        
    def stopwhen(self, fun):
        def gen():
            for x in self._iterable:
                if fun(x):
                    break
                yield x
        return Stream(gen())
    
    def reduce(self, fun, initial=None):
        return functools.reduce(fun, self._iterable, initial)
    
    def foreach(self, fun):
        for x in self._iterable:
            fun(x)

    def collect(self):
        return list(self._iterable)

Use cases:

In [1]: Stream(range(100)).skip(10).step(3).limit(10).map(lambda x: x//3).collect()                                                                                       
Out[1]: [3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

In [2]: Stream(range(100))[10:20:3].map(lambda x: x+1).collect()                                                                                                          
Out[2]: [11, 14, 17, 20]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment