Created
April 13, 2018 09:26
-
-
Save devlights/0777285e612ca666d1dea4a35836e068 to your computer and use it in GitHub Desktop.
[python][rxpy][reactive python] RxPYのサンプル
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Reactive Python (RxPy) のサンプル | |
2018/04/13:まだ途中 | |
REFERENCES:: | |
https://auth0.com/blog/reactive-programming-in-python/ | |
https://github.com/ReactiveX/RxPY | |
""" | |
import threading | |
import rx | |
from icecream import ic | |
class PrintObserver(rx.Observer): | |
def on_next(self, value): | |
ic(value) | |
def on_completed(self): | |
ic() | |
def on_error(self, error): | |
ic(error) | |
class WithEventObserver(rx.Observer): | |
def __init__(self, event): | |
self._event = event | |
def on_next(self, value): | |
ic(value) | |
def on_completed(self): | |
ic() | |
self._event.set() | |
def on_error(self, error): | |
ic(error) | |
self._event.set() | |
def go(): | |
# 最も基本的なパターン | |
observer = PrintObserver() | |
source = rx.Observable.from_(list(range(10, 20))) | |
source.subscribe(observer) | |
# ---------------------------------------------------------------------------- | |
# Reactiveプログラミングは、どのような形の Observable を構築するかというのが大事 | |
# あとは、それを購読して処理する。 | |
# ---------------------------------------------------------------------------- | |
# just() -- そのまま発行 | |
# 以下では [1, 2, 3] を渡すので、そのまま [1, 2, 3] で発行される | |
rx.Observable.just(list(range(3))).subscribe(observer) | |
# interval() -- 間隔を付与 | |
# take() -- 指定数のイベントを取得 | |
# | |
# interval() は別スレッドで処理を発行するので | |
# そのままだと、メインスレッドが終わってしまうため、待ち合わせ。 | |
event = threading.Event() | |
with_event = WithEventObserver(event) | |
source.interval(500).take(3).subscribe(with_event) | |
event.wait() | |
event.clear() | |
# filter() -- 流れるイベントをフィルタリングする | |
source.filter(lambda x: x > 15).subscribe(observer) | |
# throttle_last() -- 指定間隔内の直近の値を発行する | |
source.interval(500).take(5).throttle_last(1000).subscribe(with_event) | |
event.wait() | |
event.clear() | |
if __name__ == '__main__': | |
go() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment