Created
February 26, 2017 17:11
-
-
Save Darth-Knoppix/a0ec2e260f158175387cd294c02d88d7 to your computer and use it in GitHub Desktop.
Twitter NLP with Spacy and Textblob
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twython import Twython, TwythonStreamer | |
from mitie import * | |
import spacy | |
from textblob import TextBlob | |
import sys, os, json, random | |
from nltk.corpus import stopwords | |
import markovify | |
# ********************************************************************************************************************** | |
# Get Locations | |
# ********************************************************************************************************************** | |
def get_locations_closest_to(lat,long): | |
return twitter.get_closest_trends(lat=lat, long=long) | |
# ********************************************************************************************************************** | |
# Location Variables | |
# ********************************************************************************************************************** | |
new_zealand = { 'country': 'New Zealand', 'lat': -41.291422, 'long': 174.775412, 'woeid': 23424916, 'countryCode': 'NZ'} | |
# new_zealand = get_locations_closest_to(new_zealand['lat'], new_zealand['long']) | |
def get_trends_for(place): | |
return twitter.get_place_trends(id=place['woeid'])[0]['trends'] | |
def choose_random_trend(trends): | |
return random.choice(trends)['query'] | |
# tweets = choose_random_trend(new_zealand) | |
# file = open('./data/twitter-cache/trends_for_new_zealand.txt').read() | |
# trends = json.loads(file)[0]['trends'] | |
# selected_query = | |
# tweets = twitter.search(q=selected_query, geocode='{0},{1},{2}'.format(new_zealand['lat'], new_zealand['long'], '500km')) | |
# print(tweets) | |
def is_textblob(): | |
return nlp_type == 'textblob' | |
def is_spacy(): | |
return nlp_type == 'spacy' | |
def process_text(): | |
if is_textblob(): | |
return TextBlob(tweet_text) | |
if is_spacy(): | |
return NLP(tweet_text) | |
def sentences(text): | |
if is_textblob(): | |
return text.sentences | |
if is_spacy(): | |
return text.ents | |
def label(word): | |
if is_textblob(): | |
return word | |
if is_spacy(): | |
return word.label_ | |
def lemma(word): | |
if is_spacy(): | |
return word.lemma_ | |
def process_template(template): | |
if is_textblob(): | |
return TextBlob(template) | |
if is_spacy(): | |
return NLP(template) | |
def ent_type(ent): | |
if is_spacy(): | |
return ent.ent_type_ | |
def ent_text(ent): | |
if is_spacy(): | |
return ent.text | |
file = open('./data/twitter-cache/statuses_for_bill_english.json').read() | |
tweets = json.loads(file)['statuses'] | |
tweet_text = "" | |
# Choose a random tweet so we can use this later to replace words of | |
fake_tweet_template = random.choice(tweets)['text'] | |
print("-- Original --") | |
print(fake_tweet_template) | |
for tweet in tweets: | |
tweet_text += (tweet['text']) | |
# Analyse sentences and build word bag | |
NLP = spacy.load('en') | |
nlp_type = 'textblob' | |
stopwords = set(stopwords.words('english')) | |
stopwords.update(['...', 'https', 'rt', 'https!']) | |
processed_text = process_text() | |
sentences = sentences(processed_text) | |
# Build a bag of words to use from the tweets we have analysed | |
if is_spacy(): | |
word_bag = {} | |
for sentence in sentences: | |
place = word_bag.get(label(sentence)) | |
if(place is None): | |
place = word_bag[label(sentence)] = set() | |
word = place.add(lemma(sentence)) | |
if is_textblob(): | |
word_bag = [] | |
for sentence in sentences: | |
for chunk in sentence.noun_phrases: | |
if chunk.lower() in stopwords: | |
break | |
word_bag.append(chunk) | |
# for tag_word in sentence.tags: | |
# place = word_bag.get(tag_word[0]) | |
# | |
# if place is None: | |
# place = word_bag[tag_word[0]] = set() | |
# | |
# word = place.add(tag_word[1]) | |
processed_template = process_template(fake_tweet_template) | |
print("-- New --") | |
last_added = False | |
new_tweet = [] | |
if is_textblob(): | |
new_tweet = fake_tweet_template.lower() | |
print(word_bag) | |
for tag_word in processed_template.noun_phrases: | |
new_word = random.choice(word_bag) | |
if new_word in stopwords: | |
break | |
new_tweet = new_tweet.replace(tag_word, new_word) | |
print(new_tweet) | |
if is_spacy(): | |
for i in range(len(processed_template) - 1): | |
part = processed_template[i] | |
next_part = processed_template[i + 1] | |
new_part = "" | |
type = ent_type(part) | |
next_type = ent_type(next_part) | |
if type in word_bag and type == next_type: | |
try: | |
new_part = word_bag[type].pop() | |
print("Replacing " + ent_text(part) + " with " + new_part) | |
except KeyError: | |
new_part = ent_text(part) | |
else: | |
new_part = ent_text(part) | |
new_tweet.append(new_part) | |
print(' '.join(new_tweet)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment