Skip to content

Instantly share code, notes, and snippets.

@thomasnield
Created November 14, 2016 02:41
Show Gist options
  • Select an option

  • Save thomasnield/761fbe71507b2f88028ec0d81f075031 to your computer and use it in GitHub Desktop.

Select an option

Save thomasnield/761fbe71507b2f88028ec0d81f075031 to your computer and use it in GitHub Desktop.
ReactiveX Live Twitter Feed Using RxPy
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json
from rx import Observable
#Variables that contains the user credentials to access Twitter API
#Create your own at https://apps.twitter.com/
access_token = "CONFIDENTIAL"
access_token_secret = "CONFIDENTIAL"
consumer_key = "CONFIDENTIAL"
consumer_secret = "CONFIDENTIAL"
def tweets_for(topics):
def observe_tweets(observer):
class TweetListener(StreamListener):
def on_data(self, data):
observer.on_next(data)
return True
def on_error(self, status):
observer.on_error(status)
# This handles Twitter authetification and the connection to Twitter Streaming API
l = TweetListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, l)
stream.filter(track=topics)
return Observable.create(observe_tweets).share()
topics = ['Britain','France']
d = tweets_for(topics).map(lambda d: json.loads(d)) \
.filter(lambda map: "text" in map) \
.map(lambda map: map["text"].strip()) \
.subscribe(lambda s: print(s))
@thomasnield

Copy link
Copy Markdown
Author

See video demonstration here.
https://vimeo.com/191415381

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment