Created
March 14, 2010 00:38
-
-
Save muddana/331667 to your computer and use it in GitHub Desktop.
sentiment analysis of twitter data.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# down the test/train data : http://www.stanford.edu/~alecmgo/cs224n/twitterdata.2009.05.25.c.zip | |
#into a folder "corpus" at the level of this file (else u need to change inside the code for now) | |
import nltk | |
import random | |
import re | |
import gc | |
from pymongo import Connection | |
#method defs | |
#reads tweets, each tweet is a tuple(?, is tat what it is called in python) (string, integer). string is the tweet text, integer is 0 for :(, 4 for :) | |
def read_tweets(filename): | |
arr = [] | |
f = open(filename, 'r') | |
for line in f: | |
tweet = line.split(';;') | |
arr.append((list(re.sub('\.+', ' ', tweet[5]).split()), tweet[0])) | |
f.close() | |
return arr | |
def get_tweets(filenames): | |
tweets = [] | |
print "reading tweets..." | |
for name in filenames: | |
tweets = tweets + read_tweets(name) | |
random.shuffle(tweets) | |
return tweets | |
WORD_FEATURES = [] | |
def generate_tweet_set(filenames, feature_extractor, RECALC_FEATURES = True, NUM_WORD_FEATURES = 500): | |
tweets = get_tweets(filenames) | |
print "total tweets read: ", len(tweets) | |
global WORD_FEATURES | |
if RECALC_FEATURES: | |
WORD_FEATURES = get_word_features(getwords_in_tweets(tweets), NUM_WORD_FEATURES) | |
# print WORD_FEATURES | |
print "building train set..." | |
from nltk.classify import apply_features | |
return apply_features(feature_extractor, tweets) | |
def getwords_in_tweets(tweets): | |
all_words = [] | |
for (words, sentiment) in tweets: | |
for word in words: | |
if not(word.lower().strip(',').strip('!') in ["we","was","2", "at", "that", "it", "for", "is", "i", "on", "of", "am", "i'm", ":)", ":(", ":-)", ":-(", ")", "(", "=", "=)", "^_^", "-", "$lt;3", ":/", ":|", "www",""", "?",",", "--", ">", ";)",":'(", ";/", ";\\", "'", "=d", "in", "the", ":d", "!!", "+", "!!!", ":p", ">", ">:", ":o", ";o", "<3", "a"]) and (not word.lower().isdigit()) and (not word.lower().startswith('#')) and (not word.lower().startswith('@')) and (not word.lower().startswith('http')): | |
all_words.append(word.lower()) | |
return all_words | |
##################################### | |
#feature extraction | |
def extract_features(document): | |
document_words = set(document) | |
features = {} | |
for word in WORD_FEATURES: | |
features['contains(%s)' % word] = (word in document_words) | |
return features | |
def get_word_features(wordlist, FEATURE_COUNT): | |
print "Building Freqency Distribution..." | |
wordlist = nltk.FreqDist(wordlist) | |
print "Total words: ", len(wordlist) | |
#print "Selecting: ", FEATURE_COUNT | |
word_features = wordlist.keys()[:10000] | |
return word_features | |
def train_classifier(training_set): | |
print "training ..." | |
classifier = nltk.NaiveBayesClassifier.train(training_set) | |
print classifier.show_most_informative_features(100) | |
return classifier | |
def test_classifier(classifier, test_set, LEN = 100): | |
print "testing ..." | |
import marshal | |
accuracy = nltk.classify.accuracy(classifier, test_set[:LEN]) | |
#marshal.dump(classifier,open("./classifiers/" + str(accuracy), 'wb')) | |
print accuracy | |
################################################################## | |
training_filenames = ["corpus/frowny.txt.processed.2009.05.25", "corpus/smiley.txt.processed.2009.05.25"] | |
trg_set = generate_tweet_set(training_filenames, extract_features, True, 10000) | |
classifier = train_classifier(trg_set[:10000]) | |
trg_set = None | |
gc.collect() | |
test_filenames = ["corpus/testdata.manual.2009.05.25"] | |
test_classifier(classifier, generate_tweet_set(test_filenames, extract_features, False), 500) | |
################################################################### | |
connection = Connection() | |
db = connection["sample_data"] | |
tweet_coll = db["sample"] | |
def classify_tweets_from_db(tweet_coll, count): | |
for tweet in tweet_coll.find()[:count]: | |
out = classifier.classify(extract_features(tweet["text"])) | |
if "0" == out: | |
print ":(" | |
elif "4" == out: | |
print ":)" | |
else: | |
raise Exception("doenst match the class given") | |
if tweet["user"]["lang"] == "en": | |
print tweet["text"].encode('utf_8') | |
def classify_tweets_with(query): | |
print "classifying tweets..." | |
global classifier | |
global tweet_coll | |
print tweet_coll.find({ "lang" : u"en"}).count() | |
for tweet in tweet_coll.find({ "text" : re.compile(".*"+query+".*") }).limit(100): | |
out = classifier.classify(extract_features(tweet["text"])) | |
if "0" == out: | |
print ":(" | |
elif "4" == out: | |
print ":)" | |
else: | |
raise Exception("doenst match the class given") | |
if tweet["user"]["lang"] == "en": | |
print tweet["text"].encode('utf_8') | |
#classify_tweets_with("obama") | |
classify_tweets_from_db(tweet_coll, 100) | |
#try: | |
# classify_tweets_from_db(tweet_coll, 10000) | |
#except Exception as mess: | |
# print mess | |
# exit(1) | |
#exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment