Skip to content

Instantly share code, notes, and snippets.

@rrader
Last active December 16, 2015 17:39
Show Gist options
  • Select an option

  • Save rrader/a8df54b2a16ac7dd8b75 to your computer and use it in GitHub Desktop.

Select an option

Save rrader/a8df54b2a16ac7dd8b75 to your computer and use it in GitHub Desktop.
from rx import Observable, AnonymousObservable
from rx.concurrency import current_thread_scheduler
from rx.internal import extensionclassmethod
@extensionclassmethod(Observable)
def from_iterable_with_interval(cls, period, iterable, scheduler=None):
"""Creates an Observable that polls iterator made from iterable every period millis."""
iterator = iter(iterable)
return Observable.interval(period, scheduler=scheduler).map(lambda v: next(iterator))
...
reed_switch_sensor = \
Observable.from_iterable_with_interval(500, source, scheduler=aio_scheduler). \
distinct_until_changed(). \
timestamp()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment