Created
February 17, 2009 03:38
-
-
Save batasrki/65562 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import twitter | |
| import sys | |
| from whoosh.fields import Schema, STORED, ID, KEYWORD, TEXT | |
| import os, os.path | |
| from whoosh import store, index | |
| import shelve | |
| from whoosh.qparser import QueryParser | |
| from spambayes import hammie, Options, mboxutils, storage | |
| from spambayes.Version import get_current_version | |
| import email | |
| class HammieFilter(object): | |
| def __init__(self): | |
| options = Options.options | |
| options["Storage", "persistent_storage_file"] = "~/.secret_santa/hammiedb" | |
| options.merge_files(['/etc/hammierc', os.path.expanduser('~/.secret_santa/.hammierc')]) | |
| self.dbname, self.usedb = storage.database_type([]) | |
| self.mode = self.h = None | |
| def open(self, mode): | |
| if self.h is None or self.mode != mode: | |
| if self.h is not None: | |
| if self.mode != 'r': | |
| self.h.store() | |
| self.h.close() | |
| self.mode = mode | |
| self.h = hammie.open(self.dbname, self.usedb, self.mode) | |
| def close(self): | |
| if self.h is not None: | |
| if self.mode != 'r': | |
| self.h.store() | |
| self.h.close() | |
| self.h = None | |
| __del__ = close | |
| def newdb(self): | |
| self.open('n') | |
| self.close() | |
| def filter(self, msg): | |
| if Options.options["Hammie", "train_on_filter"]: | |
| self.open('c') | |
| else: | |
| self.open('r') | |
| return self.h.filter(msg) | |
| def filter_train(self, msg): | |
| self.open('c') | |
| return self.h.filter(msg, train=True) | |
| def train_ham(self, msg): | |
| self.open('c') | |
| self.h.train_ham(msg, Options.options["Headers", "include_trained"]) | |
| self.h.store() | |
| def train_spam(self, msg): | |
| self.open('c') | |
| self.h.train_spam(msg, Options.options["Headers", "include_trained"]) | |
| self.h.store() | |
| def untrain_ham(self, msg): | |
| self.open('c') | |
| self.h.untrain_ham(msg) | |
| self.h.store() | |
| def untrain_spam(self, msg): | |
| self.open('c') | |
| self.h.untrain_spam(msg) | |
| self.h.store() | |
| def init_dirs(): | |
| paths = [".secret_santa", ".secret_santa/index"] | |
| for dir in paths: | |
| if not os.path.exists(dir): | |
| os.mkdir(dir) | |
| storage = store.FileStorage(".secret_santa/index") | |
| def setup_twitter(username, password): | |
| api = twitter.Api(username=username, password=password) | |
| print "Getting latest timeline for '%s'" % username | |
| statuses = api.GetUserTimeline(username) | |
| return api, statuses | |
| def tweet_to_mail_string(status): | |
| return """From: %s | |
| To: twitter | |
| Id: %d | |
| Subject: | |
| %s | |
| """ % (status.user.screen_name, status.id, status.text) | |
| def train_spam(statuses, spamlist): | |
| h = HammieFilter() | |
| if not os.path.exists(h.dbname): | |
| h.newdb() | |
| # there's got to be a way to do this without converting tweets to email | |
| as_emails = [email.message_from_string(tweet_to_mail_string(s)) for s in | |
| statuses] | |
| for i in [int(x) for x in spamlist]: | |
| msg = as_emails[i] | |
| del(as_emails[i]) | |
| h.train_spam(msg) | |
| for msg in as_emails: | |
| h.train_ham(msg) | |
| def filter_spam(statuses, spamlist): | |
| """Given a list of twitter status, run them through the spam filter and | |
| return two lists of spam vs. ham.""" | |
| h = HammieFilter() | |
| spam = [] | |
| ham = [] | |
| as_emails = [email.message_from_string(tweet_to_mail_string(s)) for s in | |
| statuses] | |
| for status in statuses: | |
| msg = email.message_from_string(tweet_to_mail_string(status)) | |
| h.filter(msg) | |
| if "spam" in msg["X-Spambayes-Classification"]: | |
| spam.append(status) | |
| else: | |
| ham.append(status) | |
| return spam, ham | |
| def index_tweets(ndx, statuses): | |
| """Takes statuses and indexes them.""" | |
| writer = ndx.writer() | |
| for status in statuses: | |
| writer.add_document(username=unicode(status.user.screen_name), | |
| text=unicode(status.text)) | |
| writer.commit() | |
| def setup_index(path): | |
| """Creates the Whoosh index to use with index_tweets.""" | |
| try: | |
| return index.open_dir(path) | |
| except: | |
| return index.create_in(path, username=TEXT(stored=True), text=TEXT(stored=True)) | |
| def show_tweets(statuses): | |
| for i,status in enumerate(statuses): | |
| print "%d: %s\n --> %s" % (i,status[0],status[1]) | |
| def update_past(statuses): | |
| """Given a list of statuses, it will return only the new ones and also | |
| store them via their ID in a shelve.""" | |
| db = shelve.open(".secret_santa/past") | |
| latest = [] | |
| for status in statuses: | |
| id = str(status.id) | |
| if id not in db: | |
| db[id] = {"name": status.user.screen_name, "text": status.text} | |
| latest.append(status) | |
| return latest | |
| def search_past(ndx, search): | |
| parser = QueryParser("text", schema = ndx.schema) | |
| query = parser.parse(unicode(search)) | |
| return ndx.searcher().search(query) | |
| init_dirs() | |
| username, password = sys.argv[1:] | |
| # connect to twitter and get the latest timeline | |
| api, statuses = setup_twitter(username, password) | |
| show_tweets([(s.user.screen_name,s.text) for s in statuses]) | |
| print "What list the numbers that are spam (separate by spaces):" | |
| try: | |
| spamlist = raw_input("> ").split(" ") | |
| except EOFError: | |
| spamlist = [] | |
| if spamlist: | |
| train_spam(statuses, spamlist) | |
| # spam filter it and display the results | |
| spam, ham = filter_spam(statuses, spamlist) | |
| spam = update_past(spam) | |
| ham = update_past(ham) | |
| # index the latest | |
| ndx = setup_index(".secret_santa/index") | |
| index_tweets(ndx, ham) | |
| if ham: | |
| print "\nThe latest ham:" | |
| show_tweets([(s.user.screen_name,s.text) for s in ham]) | |
| if spam: | |
| print "\nThe latest spam:" | |
| show_tweets([(s.user.screen_name,s.text) for s in spam]) | |
| # prompt for search terms and display results | |
| try: | |
| search = raw_input("> ") | |
| except EOFError: | |
| search = "" | |
| if search: | |
| results = search_past(ndx, search) | |
| show_tweets([(r['username'],r['text']) for r in results]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment