-
-
Save flashlin/dd97180c88f189231a6211c46d138237 to your computer and use it in GitHub Desktop.
[PyRx Example1] #Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from rx import Observer, Observable | |
from rx.core import Scheduler | |
def observable_fn(observer): | |
_task = None | |
# Customize this | |
def _setup(): | |
print('Observable setup') | |
# Customize this | |
def _teardown(): | |
print('Observable teardown') | |
if _task: | |
_task.cancel() | |
observer.on_completed() | |
# Customize this | |
async def _loop(): | |
counter = 0 | |
while True: | |
await asyncio.sleep(2) # Simulating long-running task | |
counter += 1 | |
observer.on_next(counter) | |
# Don't touch this | |
async def _run_loop(): | |
try: | |
await _loop() | |
except asyncio.CancelledError: | |
print('Observable cancelled') | |
finally: | |
_teardown() | |
# Don't touch this | |
_setup() | |
loop = asyncio.get_event_loop() | |
_task = loop.create_task(_run_loop()) | |
print('CREATING OBSERVABLE') | |
observable = Observable.create(observable_fn).observe_on(Scheduler.event_loop).share() | |
print('CREATING OBSERVERS') | |
observer1 = observable.subscribe(lambda x: print('O1: {0}'.format(x))) | |
observer2 = observable.subscribe(lambda x: print('O2: {0}'.format(x))) | |
observer3 = observable.subscribe(lambda x: print('O3: {0}'.format(x))) | |
loop = asyncio.get_event_loop() | |
try: | |
print('STARTING LOOP') | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pending_tasks = asyncio.Task.all_tasks() | |
for task in pending_tasks: | |
task.cancel() | |
loop.run_until_complete(asyncio.gather(*pending_tasks)) | |
loop.close() | |
print('THE END') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment