Created
May 21, 2012 14:41
-
-
Save steveatinfincia/2762699 to your computer and use it in GitHub Desktop.
Dearlytweeted's streaming filter, now with a config file for oauth credentials
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib | |
import json | |
import re | |
import sys | |
import os | |
import datetime | |
from time import sleep | |
import sys | |
import threading | |
import logging | |
import tweepy | |
import ConfigParser | |
logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s',) | |
newpath = "/opt/dearlytweeted/releases" | |
if newpath not in sys.path: | |
sys.path.append(newpath) | |
# need to grab the config before trying anything else and verify it, config changes depending on where the code is being deployed | |
# so this has to make sure to fail early | |
""" | |
Format for the config file is as follows: | |
[OAuth] | |
consumer_key = REPLACE | |
consumer_secret = REPLACE | |
access_token = REPLACE | |
access_token_secret = REPLACE | |
""" | |
config = ConfigParser.ConfigParser() | |
config.read('/opt/dearlytweeted/oauth.cfg') | |
consumer_key = config.get('OAuth', 'consumer_key') | |
consumer_secret = config.get('OAuth', 'consumer_secret') | |
access_token = config.get('OAuth', 'access_token') | |
access_token_secret = config.get('OAuth', 'access_token_secret') | |
if len(consumer_key) <= 0 or len(consumer_secret) <= 0 or len(access_token) <= 0 or len(access_token_secret) <= 0: | |
logging.debug("Config data not available, exiting so we don't hammer twitter with auths") | |
sys.exit() | |
else: | |
pass # we dont actually need to log these and its a bad idea, but during dev its an easy way to make sure config is there | |
#logging.debug("CK: %s" % consumer_key) | |
#logging.debug("CS: %s" % consumer_secret) | |
#logging.debug("AT: %s" % access_token) | |
#logging.debug("AS: %s" % access_token_secret) | |
from dearlytweeted.database import Database | |
dearlytweeted_db = Database() | |
# global variable to store the last time a message was received by the twitter API | |
last_message_timestamp = datetime.datetime.now() | |
# everything related to dealing with twitter is inside this class | |
class DTStream(tweepy.StreamListener): | |
def on_status(self, status): | |
try: | |
# grab a lock so we dont stomp all over the timeout thread | |
tlock = threading.Lock() | |
# use the lock as a context manager, automatically handles acquiring and releasing the lock | |
with tlock: | |
global last_message_timestamp | |
last_message_timestamp = datetime.datetime.now() | |
# check for links and simply return if we find one, avoids tons of spam and useless tweets | |
if "http://" in status.text or "https://" in status.text: | |
return | |
# find out if the tweet data actually matches the format, if not just ignore it | |
if re.match('\ADear\s+[A-Za-z0-9\-_]+,',status.text,flags=re.IGNORECASE): | |
#logging.debug("Message: " + str(datetime.datetime.now())) | |
# since the tweet matched we can pull the subject out | |
subject = re.match('\ADear\s+([A-Za-z0-9\-_]+),',status.text,flags=re.IGNORECASE).group(1) | |
# pass the tweet back to the database along with the subject | |
dearlytweeted_db.save_tweet(status,subject) | |
else: | |
# no match, no reason to do anything at all with this tweet | |
pass | |
#logging.debug("No match: " + str(datetime.datetime.now())) | |
except Exception as e: | |
# log the exception so we can spot problems if they come up in the future | |
logging.debug("Exception during stream parsing: %s" % e.__str__()) | |
return | |
def on_delete(self, status_id, user_id): | |
"""Called when a delete notice arrives for a status""" | |
return | |
def on_limit(self, track): | |
logging.debug("Limit notification returned by twitter: %s" % str(track) ) | |
return | |
def on_error(self, status_code): | |
logging.debug("Error returned by twitter: %s" % str(status_code) ) | |
return False | |
def on_timeout(self): | |
logging.debug("Stream exiting at %s" % str( datetime.datetime.now()) ) | |
sys.exit() | |
return | |
# background thread loop that does nothing but check the last message timestamp, if | |
# its been more than 30 seconds it means the twitter heartbeat signal (or a tweet) hasnt arrived recently | |
# which means the connection probably dropped for any of 10+ legitimate reasons that arent errors but | |
# must be handled by exiting and letting supervisord restart this process. | |
def timeout_check(): | |
logging.debug("Timeout thread running") | |
while True: | |
current_time = datetime.datetime.now() | |
# grab the lock | |
tlock = threading.Lock() | |
# context manager wraps the whole thing here, acquires the lock and gets rid of it after its through | |
with tlock: | |
elapsed = current_time - last_message_timestamp | |
if elapsed > datetime.timedelta(seconds=90): | |
logging.debug("Longer than 30 seconds since last twitter message, exiting") | |
sys.exit() | |
else: | |
#logging.debug("< 30 seconds since last twitter keepalive, continuing") | |
pass | |
sleep(5) | |
return | |
# this point should never be hit until the variables used here are actually available, but beware :) | |
tauth = tweepy.OAuthHandler(consumer_key, consumer_secret) | |
tauth.set_access_token(access_token, access_token_secret) | |
api = tweepy.API(tauth) | |
# log the beginning of the stream and start it | |
logging.debug("Starting request stream with %s at %s" % ( api.me().name, str(datetime.datetime.now()) ) ) | |
stream = DTStream() | |
streamer = tweepy.Stream(auth=tauth, listener=stream, timeout=None ) | |
filter_terms = ['dear'] | |
streamer.filter(None,filter_terms) | |
# create a new timeout checker thread that will run in the background and kill this process if twitter stops heartbeat'ing | |
t = threading.Thread(target=timeout_check) | |
t.start() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment