Skip to content

Instantly share code, notes, and snippets.

@stiles
Last active December 14, 2015 11:58
Show Gist options
  • Save stiles/5082637 to your computer and use it in GitHub Desktop.
Save stiles/5082637 to your computer and use it in GitHub Desktop.
NPR Generation Listen Tweets
query = '#NPRGL'
# Edit this to save results to a different SQLite DB
db_path = 'nprgl.db'
import sys
import json
import urllib
import urllib2
import dateutil.parser
import sqlalchemy
import sqlalchemy.orm
import sqlalchemy.ext.declarative
from datetime import datetime
from sqlalchemy import Column, String, Integer, DateTime
from collections import defaultdict
# Handles calls to twitter API, paging of search results
class TwitterSearch(object):
def __init__(self, query):
self.base_url = 'http://search.twitter.com/search.json'
self.query = query # Unescaped query string
self.rpp = 200 # Results per page
self.next_page = None # URL params to query next search page
self.since_id = None # Search for tweets > this tweet ID
# Get URL and params to query twitter API with
def get_search_url(self):
# Use URL + params returned by twitter API if possible...
if self.next_page is not None:
return self.base_url + self.next_page
# ...otherwise generate our own
data = {'q': self.query, 'rpp': self.rpp}
if self.since_id is not None:
data['since_id'] = self.since_id
return self.base_url + '?' + urllib.urlencode(data)
# Call twitter API, return a python dict built from the JSON returned
def do_query(self):
request = urllib2.Request(self.get_search_url())
try:
response = urllib2.urlopen(request)
except urllib2.HTTPError, e:
print 'HTTPError: ', e.code
sys.exit(1)
except urllib2.URLError, e:
print 'URLError: ', e.reason
sys.exit(1)
return json.loads(response.read())
# Returns iterator to loop through each tweet
def get_tweets(self):
# Limit number of pages to n_max used as a safety net in case
# twitter api doesn't stop returning next_page links (twitter should
# stop at 15). Will need changing if self.rpp is changed.
n = 0
n_max = 20
done = False
while not done and n < n_max:
n += 1
# Query twitter API
q_results = self.do_query()
# Return a single tweet each time this function is called
if q_results.has_key('results'):
tweets = q_results['results']
while len(tweets) > 0:
yield tweets.pop(0)
# When we run out of tweets, query next page of results if possible
if q_results.has_key('next_page'):
self.next_page = q_results['next_page']
else:
done = True
# Set up sqlite database and ORM for storing tweets
engine = sqlalchemy.create_engine("sqlite:///%s" % db_path)
Session = sqlalchemy.orm.sessionmaker(bind=engine)
Base = sqlalchemy.ext.declarative.declarative_base()
# Basic ORM class to store tweet in DB
class Tweet(Base):
__tablename__ = 'tweets'
id = Column(Integer, primary_key=True)
iso_language_code = Column(String)
text = Column(String)
created_at = Column(DateTime(timezone=True))
to_user = Column(String)
to_user_id = Column(Integer)
from_user = Column(String)
from_user_id = Column(Integer)
source = Column(String)
geo = Column(String)
def __init__(self, partial_data):
# Fill in missing key:value pairs with key:None
data = defaultdict(lambda:None)
data.update(partial_data)
self.id = data['id']
self.iso_language_code = data['iso_language_code']
self.text = data['text']
# Convert date to datetime object for storing UTC in DB
if data['created_at'] is not None:
self.created_at = dateutil.parser.parse(data['created_at'])
else:
self.created_at = None
self.profile_image_url = data['profile_image_url']
self.to_user = data['to_user']
self.to_user_id = data['to_user_id']
self.from_user = data['from_user']
self.from_user_id = data['from_user_id']
self.source = data['from_source']
# Geo information is of the form:
# {u'type': u'Point', u'coordinates': [35.073, -77.042900000000003]}]
# Store easily parsable string form in DB
if data['geo'] is not None:
self.geo = repr(data['geo'])
else:
self.geo = None
# Create DB and tables if needed, create DB connection
Base.metadata.create_all(engine)
session = Session()
# Set up twitter search object
ts = TwitterSearch(query)
# Get ID of latest tweet we have, to avoid searching for tweets we've seen
sql = 'SELECT MAX(id) as i FROM tweets'
row = session.query('i').from_statement(sql).first()
if row[0] is not None:
ts.since_id = row[0]
# Query search api for as many results as we can get, and add to DB
n_tweets = 0
for tweet in ts.get_tweets():
tweet_obj = Tweet(tweet)
try:
session.add(tweet_obj)
session.commit()
n_tweets += 1
except sqlalchemy.exc.IntegrityError, e:
# Search sometimes returns tweets with duplicate ID, rollback and
# ignore that tweet if this happens
session.rollback()
print datetime.now().isoformat(), "- %s new tweets added" % n_tweets
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment