Skip to content

Instantly share code, notes, and snippets.

@fqxp
Created October 3, 2016 20:19
Show Gist options
  • Save fqxp/ee055ec77b4b697d8e0c2eb00f5a1159 to your computer and use it in GitHub Desktop.
Save fqxp/ee055ec77b4b697d8e0c2eb00f5a1159 to your computer and use it in GitHub Desktop.
Demo for using rx with gbulb and asyncio in python
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
import gbulb
import rx
import json
from rx.subjects import Subject
from rx.concurrency import AsyncIOScheduler
from tornado.httpclient import AsyncHTTPClient
from tornado.httputil import url_concat
from tornado.platform.asyncio import AsyncIOMainLoop
asyncio = rx.config['asyncio']
def search_wikipedia(term):
"""Search Wikipedia for a given term"""
url = 'http://en.wikipedia.org/w/api.php'
params = {
"action": 'opensearch',
"search": term,
"format": 'json'
}
# Must set a user agent for non-browser requests to Wikipedia
user_agent = "just testing"
url = url_concat(url, params)
http_client = AsyncHTTPClient()
return http_client.fetch(url, method='GET', user_agent=user_agent)
subject = Subject()
def subscribe_to_subject(labels):
scheduler = AsyncIOScheduler()
query = subject.filter(
lambda text: len(text) > 2
).debounce(
1.5,
scheduler=scheduler
).distinct_until_changed()
searcher = query.flat_map_latest(search_wikipedia)
def send_response(response):
data = json.loads(response.body.decode('utf-8'))
entries = data[1][:5]
for i, entry in enumerate(entries):
labels[i].set_text(entry)
def on_error(ex):
print(ex)
searcher.subscribe(send_response, on_error)
def main():
gbulb.install(gtk=True)
AsyncIOMainLoop().install()
loop = gbulb.get_event_loop()
window = Gtk.Window()
window.connect('delete-event', lambda *args: loop.stop())
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
text_input = Gtk.Entry()
box.pack_start(text_input, True, True, 0)
labels = [Gtk.Label('hello %d' % i) for i in range(5)]
for label in labels:
box.pack_start(label, True, True, 0)
window.add(box)
subscribe_to_subject(labels)
def on_text_entered(*args):
subject.on_next(text_input.get_property('text'))
text_input.connect('notify::text', on_text_entered)
window.show_all()
loop.run_forever()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment