Skip to content

Instantly share code, notes, and snippets.

@tastywheat
Created July 28, 2015 01:39
Show Gist options
  • Save tastywheat/f7a69cdd33195168fb14 to your computer and use it in GitHub Desktop.
Save tastywheat/f7a69cdd33195168fb14 to your computer and use it in GitHub Desktop.
rxpy create observable
from tornado import gen, ioloop
from rx import Observable
def create_observable(observer):
observer.on_next(50)
observer.on_next(2)
observer.on_next(8)
observer.on_next(40)
# @gen.coroutine
def do_work(x):
print("ASYNC %s" % x)
source = (Observable.create(create_observable)
.delay(1000)
.map(lambda x: x * 2))
def main():
print(1)
source.subscribe(do_work)
print(2)
if __name__ == '__main__':
main()
ioloop.IOLoop.current().start()
@sadegh
Copy link

sadegh commented Jan 24, 2017

Thanks for the gist!
Any idea how to pass another parameter to create_observable() method other than observer? (I am not that much familiar with Python, so excuse me if my question is too simple :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment