Skip to content

Instantly share code, notes, and snippets.

@batasrki
Created February 17, 2009 03:38
Show Gist options
  • Select an option

  • Save batasrki/65562 to your computer and use it in GitHub Desktop.

Select an option

Save batasrki/65562 to your computer and use it in GitHub Desktop.
import twitter
import sys
from whoosh.fields import Schema, STORED, ID, KEYWORD, TEXT
import os, os.path
from whoosh import store, index
import shelve
from whoosh.qparser import QueryParser
from spambayes import hammie, Options, mboxutils, storage
from spambayes.Version import get_current_version
import email
class HammieFilter(object):
def __init__(self):
options = Options.options
options["Storage", "persistent_storage_file"] = "~/.secret_santa/hammiedb"
options.merge_files(['/etc/hammierc', os.path.expanduser('~/.secret_santa/.hammierc')])
self.dbname, self.usedb = storage.database_type([])
self.mode = self.h = None
def open(self, mode):
if self.h is None or self.mode != mode:
if self.h is not None:
if self.mode != 'r':
self.h.store()
self.h.close()
self.mode = mode
self.h = hammie.open(self.dbname, self.usedb, self.mode)
def close(self):
if self.h is not None:
if self.mode != 'r':
self.h.store()
self.h.close()
self.h = None
__del__ = close
def newdb(self):
self.open('n')
self.close()
def filter(self, msg):
if Options.options["Hammie", "train_on_filter"]:
self.open('c')
else:
self.open('r')
return self.h.filter(msg)
def filter_train(self, msg):
self.open('c')
return self.h.filter(msg, train=True)
def train_ham(self, msg):
self.open('c')
self.h.train_ham(msg, Options.options["Headers", "include_trained"])
self.h.store()
def train_spam(self, msg):
self.open('c')
self.h.train_spam(msg, Options.options["Headers", "include_trained"])
self.h.store()
def untrain_ham(self, msg):
self.open('c')
self.h.untrain_ham(msg)
self.h.store()
def untrain_spam(self, msg):
self.open('c')
self.h.untrain_spam(msg)
self.h.store()
def init_dirs():
paths = [".secret_santa", ".secret_santa/index"]
for dir in paths:
if not os.path.exists(dir):
os.mkdir(dir)
storage = store.FileStorage(".secret_santa/index")
def setup_twitter(username, password):
api = twitter.Api(username=username, password=password)
print "Getting latest timeline for '%s'" % username
statuses = api.GetUserTimeline(username)
return api, statuses
def tweet_to_mail_string(status):
return """From: %s
To: twitter
Id: %d
Subject:
%s
""" % (status.user.screen_name, status.id, status.text)
def train_spam(statuses, spamlist):
h = HammieFilter()
if not os.path.exists(h.dbname):
h.newdb()
# there's got to be a way to do this without converting tweets to email
as_emails = [email.message_from_string(tweet_to_mail_string(s)) for s in
statuses]
for i in [int(x) for x in spamlist]:
msg = as_emails[i]
del(as_emails[i])
h.train_spam(msg)
for msg in as_emails:
h.train_ham(msg)
def filter_spam(statuses, spamlist):
"""Given a list of twitter status, run them through the spam filter and
return two lists of spam vs. ham."""
h = HammieFilter()
spam = []
ham = []
as_emails = [email.message_from_string(tweet_to_mail_string(s)) for s in
statuses]
for status in statuses:
msg = email.message_from_string(tweet_to_mail_string(status))
h.filter(msg)
if "spam" in msg["X-Spambayes-Classification"]:
spam.append(status)
else:
ham.append(status)
return spam, ham
def index_tweets(ndx, statuses):
"""Takes statuses and indexes them."""
writer = ndx.writer()
for status in statuses:
writer.add_document(username=unicode(status.user.screen_name),
text=unicode(status.text))
writer.commit()
def setup_index(path):
"""Creates the Whoosh index to use with index_tweets."""
try:
return index.open_dir(path)
except:
return index.create_in(path, username=TEXT(stored=True), text=TEXT(stored=True))
def show_tweets(statuses):
for i,status in enumerate(statuses):
print "%d: %s\n --> %s" % (i,status[0],status[1])
def update_past(statuses):
"""Given a list of statuses, it will return only the new ones and also
store them via their ID in a shelve."""
db = shelve.open(".secret_santa/past")
latest = []
for status in statuses:
id = str(status.id)
if id not in db:
db[id] = {"name": status.user.screen_name, "text": status.text}
latest.append(status)
return latest
def search_past(ndx, search):
parser = QueryParser("text", schema = ndx.schema)
query = parser.parse(unicode(search))
return ndx.searcher().search(query)
init_dirs()
username, password = sys.argv[1:]
# connect to twitter and get the latest timeline
api, statuses = setup_twitter(username, password)
show_tweets([(s.user.screen_name,s.text) for s in statuses])
print "What list the numbers that are spam (separate by spaces):"
try:
spamlist = raw_input("> ").split(" ")
except EOFError:
spamlist = []
if spamlist:
train_spam(statuses, spamlist)
# spam filter it and display the results
spam, ham = filter_spam(statuses, spamlist)
spam = update_past(spam)
ham = update_past(ham)
# index the latest
ndx = setup_index(".secret_santa/index")
index_tweets(ndx, ham)
if ham:
print "\nThe latest ham:"
show_tweets([(s.user.screen_name,s.text) for s in ham])
if spam:
print "\nThe latest spam:"
show_tweets([(s.user.screen_name,s.text) for s in spam])
# prompt for search terms and display results
try:
search = raw_input("> ")
except EOFError:
search = ""
if search:
results = search_past(ndx, search)
show_tweets([(r['username'],r['text']) for r in results])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment