Skip to content

Instantly share code, notes, and snippets.

@Darth-Knoppix
Created February 26, 2017 17:11
Show Gist options
  • Save Darth-Knoppix/a0ec2e260f158175387cd294c02d88d7 to your computer and use it in GitHub Desktop.
Save Darth-Knoppix/a0ec2e260f158175387cd294c02d88d7 to your computer and use it in GitHub Desktop.
Twitter NLP with Spacy and Textblob
from twython import Twython, TwythonStreamer
from mitie import *
import spacy
from textblob import TextBlob
import sys, os, json, random
from nltk.corpus import stopwords
import markovify
# **********************************************************************************************************************
# Get Locations
# **********************************************************************************************************************
def get_locations_closest_to(lat,long):
return twitter.get_closest_trends(lat=lat, long=long)
# **********************************************************************************************************************
# Location Variables
# **********************************************************************************************************************
new_zealand = { 'country': 'New Zealand', 'lat': -41.291422, 'long': 174.775412, 'woeid': 23424916, 'countryCode': 'NZ'}
# new_zealand = get_locations_closest_to(new_zealand['lat'], new_zealand['long'])
def get_trends_for(place):
return twitter.get_place_trends(id=place['woeid'])[0]['trends']
def choose_random_trend(trends):
return random.choice(trends)['query']
# tweets = choose_random_trend(new_zealand)
# file = open('./data/twitter-cache/trends_for_new_zealand.txt').read()
# trends = json.loads(file)[0]['trends']
# selected_query =
# tweets = twitter.search(q=selected_query, geocode='{0},{1},{2}'.format(new_zealand['lat'], new_zealand['long'], '500km'))
# print(tweets)
def is_textblob():
return nlp_type == 'textblob'
def is_spacy():
return nlp_type == 'spacy'
def process_text():
if is_textblob():
return TextBlob(tweet_text)
if is_spacy():
return NLP(tweet_text)
def sentences(text):
if is_textblob():
return text.sentences
if is_spacy():
return text.ents
def label(word):
if is_textblob():
return word
if is_spacy():
return word.label_
def lemma(word):
if is_spacy():
return word.lemma_
def process_template(template):
if is_textblob():
return TextBlob(template)
if is_spacy():
return NLP(template)
def ent_type(ent):
if is_spacy():
return ent.ent_type_
def ent_text(ent):
if is_spacy():
return ent.text
file = open('./data/twitter-cache/statuses_for_bill_english.json').read()
tweets = json.loads(file)['statuses']
tweet_text = ""
# Choose a random tweet so we can use this later to replace words of
fake_tweet_template = random.choice(tweets)['text']
print("-- Original --")
print(fake_tweet_template)
for tweet in tweets:
tweet_text += (tweet['text'])
# Analyse sentences and build word bag
NLP = spacy.load('en')
nlp_type = 'textblob'
stopwords = set(stopwords.words('english'))
stopwords.update(['...', 'https', 'rt', 'https!'])
processed_text = process_text()
sentences = sentences(processed_text)
# Build a bag of words to use from the tweets we have analysed
if is_spacy():
word_bag = {}
for sentence in sentences:
place = word_bag.get(label(sentence))
if(place is None):
place = word_bag[label(sentence)] = set()
word = place.add(lemma(sentence))
if is_textblob():
word_bag = []
for sentence in sentences:
for chunk in sentence.noun_phrases:
if chunk.lower() in stopwords:
break
word_bag.append(chunk)
# for tag_word in sentence.tags:
# place = word_bag.get(tag_word[0])
#
# if place is None:
# place = word_bag[tag_word[0]] = set()
#
# word = place.add(tag_word[1])
processed_template = process_template(fake_tweet_template)
print("-- New --")
last_added = False
new_tweet = []
if is_textblob():
new_tweet = fake_tweet_template.lower()
print(word_bag)
for tag_word in processed_template.noun_phrases:
new_word = random.choice(word_bag)
if new_word in stopwords:
break
new_tweet = new_tweet.replace(tag_word, new_word)
print(new_tweet)
if is_spacy():
for i in range(len(processed_template) - 1):
part = processed_template[i]
next_part = processed_template[i + 1]
new_part = ""
type = ent_type(part)
next_type = ent_type(next_part)
if type in word_bag and type == next_type:
try:
new_part = word_bag[type].pop()
print("Replacing " + ent_text(part) + " with " + new_part)
except KeyError:
new_part = ent_text(part)
else:
new_part = ent_text(part)
new_tweet.append(new_part)
print(' '.join(new_tweet))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment