Skip to content

Instantly share code, notes, and snippets.

@devlights
Created April 13, 2018 09:26
Show Gist options
  • Save devlights/0777285e612ca666d1dea4a35836e068 to your computer and use it in GitHub Desktop.
Save devlights/0777285e612ca666d1dea4a35836e068 to your computer and use it in GitHub Desktop.
[python][rxpy][reactive python] RxPYのサンプル
"""
Reactive Python (RxPy) のサンプル
2018/04/13:まだ途中
REFERENCES::
https://auth0.com/blog/reactive-programming-in-python/
https://github.com/ReactiveX/RxPY
"""
import threading
import rx
from icecream import ic
class PrintObserver(rx.Observer):
def on_next(self, value):
ic(value)
def on_completed(self):
ic()
def on_error(self, error):
ic(error)
class WithEventObserver(rx.Observer):
def __init__(self, event):
self._event = event
def on_next(self, value):
ic(value)
def on_completed(self):
ic()
self._event.set()
def on_error(self, error):
ic(error)
self._event.set()
def go():
# 最も基本的なパターン
observer = PrintObserver()
source = rx.Observable.from_(list(range(10, 20)))
source.subscribe(observer)
# ----------------------------------------------------------------------------
# Reactiveプログラミングは、どのような形の Observable を構築するかというのが大事
# あとは、それを購読して処理する。
# ----------------------------------------------------------------------------
# just() -- そのまま発行
# 以下では [1, 2, 3] を渡すので、そのまま [1, 2, 3] で発行される
rx.Observable.just(list(range(3))).subscribe(observer)
# interval() -- 間隔を付与
# take() -- 指定数のイベントを取得
#
# interval() は別スレッドで処理を発行するので
# そのままだと、メインスレッドが終わってしまうため、待ち合わせ。
event = threading.Event()
with_event = WithEventObserver(event)
source.interval(500).take(3).subscribe(with_event)
event.wait()
event.clear()
# filter() -- 流れるイベントをフィルタリングする
source.filter(lambda x: x > 15).subscribe(observer)
# throttle_last() -- 指定間隔内の直近の値を発行する
source.interval(500).take(5).throttle_last(1000).subscribe(with_event)
event.wait()
event.clear()
if __name__ == '__main__':
go()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment