Skip to content

Instantly share code, notes, and snippets.

@jlaura
Created April 20, 2016 13:47
Show Gist options
  • Select an option

  • Save jlaura/8597d180dd0c944e06e3cb9dc05682ef to your computer and use it in GitHub Desktop.

Select an option

Save jlaura/8597d180dd0c944e06e3cb9dc05682ef to your computer and use it in GitHub Desktop.
import json
from twython import TwythonStreamer
#Fill these in with the values Twitter give you when you register as a developer
APP_KEY = ''
APP_SECRET = ''
consumer_key = ''
consumer_secret = ''
access_token = ''
access_token_secret = ''
class TweetStreamer(TwythonStreamer):
tweets = []
def on_success(self, data):
# If you get a tweet back (and not an error code)
if 'text' in data:
self.tweets.append(data)
# I only wanted 500
if len(self.tweets) == 500:
self.disconnect()
# Because I only wanted 500, I did not write to disk until I had them in memory
with open('tweets.json', 'w') as f:
json.dump(self.tweets, f)
# In reality, this is a great spot to pipe the tweets to a MongoDB
# If we get an error, just give up.
def on_error(self, status_code, data):
print(status_code)
self.disconnect()
streamer = TweetStreamer(consumer_key, consumer_secret,
access_token, access_token_secret)
# Filter to get spatially enabled (no necessarily lat/lon encoded)
streamer.statuses.filter(locations='-112.5,33,-111.5,34', lang='en')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment