Created
May 20, 2012 23:19
-
-
Save steveatinfincia/2759887 to your computer and use it in GitHub Desktop.
Dearlytweeted's streaming filter, updated to use tweepy and an actual reason to have a class :)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib | |
import json | |
import re | |
import sys | |
import datetime | |
from time import sleep | |
import sys | |
import threading | |
import logging | |
import tweepy | |
logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s',) | |
newpath = "/opt/dearlytweeted/releases" | |
if newpath not in sys.path: | |
sys.path.append(newpath) | |
from dearlytweeted.database import Database | |
dearlytweeted_db = Database() | |
# global variable to store the last time a message was received by the twitter API | |
last_message_timestamp = datetime.datetime.now() | |
# everything related to dealing with twitter is inside this class | |
class DTStream(tweepy.StreamListener): | |
def on_status(self, status): | |
try: | |
# grab a lock so we dont stomp all over the timeout thread | |
tlock = threading.Lock() | |
# use the lock as a context manager, automatically handles acquiring and releasing the lock | |
with tlock: | |
global last_message_timestamp | |
last_message_timestamp = datetime.datetime.now() | |
# check for links and simply return if we find one, avoids tons of spam and useless tweets | |
if "http://" in status.text or "https://" in status.text: | |
return | |
# find out if the tweet data actually matches the format, if not just ignore it | |
if re.match('\ADear\s+[A-Za-z0-9\-_]+,',status.text,flags=re.IGNORECASE): | |
#logging.debug("Message: " + str(datetime.datetime.now())) | |
# since the tweet matched we can pull the subject out | |
subject = re.match('\ADear\s+([A-Za-z0-9\-_]+),',status.text,flags=re.IGNORECASE).group(1) | |
# pass the tweet back to the database along with the subject | |
dearlytweeted_db.save_tweet(status,subject) | |
else: | |
# no match, no reason to do anything at all with this tweet | |
pass | |
#logging.debug("No match: " + str(datetime.datetime.now())) | |
except Exception as e: | |
# log the exception so we can spot problems if they come up in the future | |
logging.debug("Exception during stream parsing: %s" % e.__str__()) | |
return | |
def on_delete(self, status_id, user_id): | |
"""Called when a delete notice arrives for a status""" | |
return | |
def on_limit(self, track): | |
logging.debug("Limit notification returned by twitter: %s" % str(track) ) | |
return | |
def on_error(self, status_code): | |
logging.debug("Error returned by twitter: %s" % str(status_code) ) | |
return False | |
def on_timeout(self): | |
logging.debug("Stream exiting at %s" % str( datetime.datetime.now()) ) | |
sys.exit() | |
return | |
# background thread loop that does nothing but check the last message timestamp, if | |
# its been more than 30 seconds it means the twitter heartbeat signal (or a tweet) hasnt arrived recently | |
# which means the connection probably dropped for any of 10+ legitimate reasons that arent errors but | |
# must be handled by exiting and letting supervisord restart this process. | |
def timeout_check(): | |
logging.debug("Timeout thread running") | |
while True: | |
current_time = datetime.datetime.now() | |
# grab the lock | |
tlock = threading.Lock() | |
# context manager wraps the whole thing here, acquires the lock and gets rid of it after its through | |
with tlock: | |
elapsed = current_time - last_message_timestamp | |
if elapsed > datetime.timedelta(seconds=90): | |
logging.debug("Longer than 30 seconds since last twitter message, exiting") | |
sys.exit() | |
else: | |
#logging.debug("< 30 seconds since last twitter keepalive, continuing") | |
pass | |
sleep(5) | |
return | |
# == OAuth Authentication == | |
# | |
# This mode of authentication is the new preferred way | |
# of authenticating with Twitter. | |
# The consumer keys can be found on your application's Details | |
# page located at https://dev.twitter.com/apps (under "OAuth settings") | |
consumer_key="REPLACE" | |
consumer_secret="REPLACE" | |
# The access tokens can be found on your applications's Details | |
# page located at https://dev.twitter.com/apps (located | |
# under "Your access token") | |
access_token="REPLACE" | |
access_token_secret="REPLACE" | |
tauth = tweepy.OAuthHandler(consumer_key, consumer_secret) | |
tauth.set_access_token(access_token, access_token_secret) | |
api = tweepy.API(tauth) | |
# log the beginning of the stream and start it | |
logging.debug("Starting request stream with %s at %s" % ( api.me().name, str(datetime.datetime.now()) ) ) | |
stream = DTStream() | |
streamer = tweepy.Stream(auth=tauth, listener=stream, timeout=None ) | |
filter_terms = ['dear'] | |
streamer.filter(None,filter_terms) | |
# create a new timeout checker thread that will run in the background and kill this process if twitter stops heartbeat'ing | |
t = threading.Thread(target=timeout_check) | |
t.start() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment