Created
May 16, 2012 17:04
-
-
Save steveatinfincia/2712239 to your computer and use it in GitHub Desktop.
Dearly Tweeted's stream filter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pycurl | |
import urllib | |
import json | |
import re | |
import sys | |
import datetime | |
from time import sleep | |
import sys | |
import threading | |
import logging | |
logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s',) | |
newpath = "/opt/dearlytweeted/releases" | |
if newpath not in sys.path: | |
sys.path.append(newpath) | |
from dearlytweeted.database import Database | |
STREAM_URL = "https://stream.twitter.com/1/statuses/filter.json" | |
USER = "USER_HERE" | |
PASS = "PASSWORD_HERE" | |
# global variable to store the last time a message was received by the twitter API | |
last_message_timestamp = datetime.datetime.now() | |
# background thread loop that does nothing but check the last message timestamp, if | |
# its been more than 30 seconds it means the twitter heartbeat signal (or a tweet) hasnt arrived recently | |
# which means the connection probably dropped for any of 10+ legitimate reasons that arent errors but | |
# must be handled by exiting and letting supervisord restart this process. | |
def timeout_check(): | |
logging.debug("Timeout thread running") | |
while True: | |
current_time = datetime.datetime.now() | |
elapsed = current_time - last_message_timestamp | |
if elapsed > datetime.timedelta(seconds=30): | |
logging.debug("Longer than 30 seconds since last twitter message, exiting") | |
sys.exit() | |
else: | |
logging.debug("< 30 seconds since last twitter keepalive, continuing") | |
pass | |
sleep(5) | |
return | |
# everything related to dealing with twitter is inside this class | |
class Stream: | |
def __init__(self): | |
# setup our filter, all we need to do is POST the word "dear" to the streaming filter API | |
filter = [ ('track',"dear") ] | |
post_data = urllib.urlencode(filter) | |
# a new database handler (same class the web workers use) | |
self.database = Database() | |
# empty buffer to store potentially incomplete returned bytes | |
self.buffer = "" | |
self.conn = pycurl.Curl() | |
self.conn.setopt(pycurl.USERPWD, "%s:%s" % (USER, PASS)) | |
self.conn.setopt(pycurl.URL, STREAM_URL) | |
self.conn.setopt(pycurl.POSTFIELDS, post_data) | |
self.conn.setopt(pycurl.POST, 1) | |
self.conn.setopt(pycurl.WRITEFUNCTION, self.on_receive) | |
self.conn.perform() | |
def on_receive(self, data): | |
global last_message_timestamp | |
last_message_timestamp = datetime.datetime.now() | |
self.buffer += data | |
if data.endswith("\r\n") and self.buffer.strip(): | |
try: | |
tweet = json.loads(self.buffer) | |
self.buffer = "" | |
if "text" in tweet: | |
text = tweet['text'] | |
if "http://" in text or "https://" in text: | |
return | |
if re.match('^Dear\s+[A-Za-z0-9]+,',text,flags=re.IGNORECASE): | |
logging.debug("Message: " + str(datetime.datetime.now())) | |
subject = re.match('^Dear\s+([A-Za-z0-9]+),',text,flags=re.IGNORECASE).group(1) | |
self.database.save_tweet(tweet,subject) | |
else: | |
logging.debug("No match: " + str(datetime.datetime.now())) | |
except Exception as e: | |
self.buffer = "" | |
logging.debug("Exception during stream parsing: %s" % e.__str__()) | |
t = threading.Thread(target=timeout_check) | |
t.start() | |
logging.debug("Starting request stream") | |
stream = Stream() | |
logging.debug("Request stream ended") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment