Last active
January 19, 2021 22:58
-
-
Save linwoodc3/d2434e36756524a2d4abdf6d230904be to your computer and use it in GitHub Desktop.
Python script that uses the python Twitter client (https://github.com/sixohsix/twitter) to pull tweets that are geolocated. Optionally stores in efficient columnar parquet data store with configurable file sizes. Took 13 secs to download 100 geolocated tweets on MacOS 10.12 with 16 GB RAM on 82 Mb/s connection.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author | |
# Linwood Creekmore III | |
# April 8 2017 | |
# heavy input from http://socialmedia-class.org/twittertutorial.html | |
# [email protected] | |
import re | |
import copy | |
import numpy as np | |
import pandas as pd | |
import json | |
try: | |
import ujson | |
except: | |
import json as ujson | |
from dateutil.parser import parse | |
#shameless copy paste from json/decoder.py | |
FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL | |
WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS) | |
from json import JSONEncoder | |
class MyEncoder(JSONEncoder): | |
def default(self, o): | |
return o.__dict__ | |
class ConcatJSONDecoder(json.JSONDecoder): | |
def decode(self, s, _w=WHITESPACE.match): | |
s_len = len(s) | |
objs = [] | |
end = 0 | |
while end != s_len: | |
obj, end = self.raw_decode(s, idx=_w(s, end).end()) | |
end = _w(s, end).end() | |
objs.append(obj) | |
return objs | |
def screens(x): | |
return x['screen_name'] | |
# get urls | |
def urls(x): | |
if len(x['urls']) != 0: | |
return x['urls'][0]['expanded_url'] | |
else: | |
return None | |
def media(x): | |
"""Media urls""" | |
if isinstance(x,dict): | |
return(x['media'][0]['media_url']) | |
else: | |
return(None) | |
# get urls | |
def coordsgetter(x): | |
if x: | |
longitude = np.float(x['coordinates'][0]) | |
latitude = np.float(x['coordinates'][1]) | |
tp = x['type'] | |
else: | |
return np.asarray([None,None,None]).reshape(1,3) | |
return np.asarray([longitude,latitude,tp]).reshape(1,3) | |
def entitiesgetter(x): | |
if x: | |
if len(x['user_mentions'])>0: | |
temp = x['user_mentions'] | |
links=[] | |
valid_user=['id','screen_name','name'] | |
for l in temp: | |
holder = (copy.deepcopy(l)) | |
holder.pop('id_str') | |
holder.pop('indices') | |
links.append(holder) | |
# | |
links = ujson.dumps({'linkedAccounts':list(links)},cls=MyEncoder) | |
else: | |
links=np.asarray([None]) | |
if len(x['hashtags'])>0: | |
hashes = [] | |
for l in x['hashtags']: | |
hashes.append("#"+l['text']) | |
hashes = ujson.dumps({'hashtags':(hashes)},cls=MyEncoder) | |
else: | |
hashes=np.asarray([None]) | |
try: | |
return np.asarray([links,hashes,]).reshape(1,2) | |
except: | |
return np.asarray([None,None]).reshape(1,2) | |
def media(x): | |
"""Media urls""" | |
if isinstance(x,dict): | |
return(x['media'][0]['media_url']) | |
else: | |
return(None) | |
def jsget(x): | |
"Break list of json dicts into pandas dataframe" | |
js = ujson.loads(x) | |
return pd.Series(js) | |
def replace(x): | |
if type(x) == np.ndarray or x == None: | |
return np.nan | |
else: | |
return x | |
################################# | |
# Converts tweet json into tidy data | |
################################# | |
def reader(filename,anonymize=True): | |
"""Transform tweet json data into tidy format. | |
This function transforms tweets into tidy format. | |
This has been tested and workds on Assumes you are retrieving | |
tweets from the Twitter API. *Requires pandas library.* | |
Parameters | |
---------- | |
filename : str | |
string representing path to twitter data in json format. | |
Returns | |
------- | |
dataframe | |
Pandas dataframe in tidy format. | |
""" | |
try: | |
df = pd.DataFrame(filename) | |
except: | |
try: | |
df = pd.read_json(filename) # try to read in normally | |
if len(df.columns)<10:# look for column | |
raise ValueError('Not here') | |
except: | |
js = json.load(open(filename),cls=ConcatJSONDecoder) | |
dd = pd.DataFrame(js) | |
if 'contributors' in dd.columns: | |
df = dd | |
elif len(dd.columns) ==1: | |
jsvect = np.vectorize(jsget) | |
jsVects = jsvect(js) | |
df = pd.concat(jsVects,axis=1).T | |
else: | |
arr = dd.values.flatten()[:,np.newaxis] | |
df = pd.DataFrame(arr[np.where(arr,True,False)].tolist()) | |
try: | |
df.dropna(subset=['coordinates', 'created_at'], how='all',inplace=True) # drop rows with NA | |
df.reset_index(inplace=True,drop=True) # reset the index | |
parsevec = np.vectorize(parse) | |
except: | |
print(df.columns) | |
try: | |
dates = parsevec(df.created_at.values) | |
except: | |
dates = df.created_at.values | |
cols = [] | |
for c in ['favorited_count','retweet_count','reply_count', 'quote_count','in_reply_to_screen_name','in_reply_to_user_id']: | |
if c in df.columns: | |
cols.append(c) | |
namevect = np.vectorize(screens) | |
names = pd.Series(namevect(df['user'].values)) | |
urlvect = np.vectorize(urls) | |
urllist = urlvect(df['entities'].values) | |
coordvect = np.vectorize(coordsgetter) | |
try: | |
coorddf = pd.DataFrame(np.concatenate(coordvect(df['coordinates'].values)),columns=['longitude','latitude','type']) | |
except: | |
coords = [coordsgetter(l) for l in df.coordinates.values] | |
coorddf = pd.DataFrame(np.asarray(coords).reshape(len(coords),3)) | |
coorddf.columns = ['longitude','latitude','type',] | |
mediavect = np.vectorize(media) | |
try: | |
mediaurls = mediavect(df['extended_entities'].values) | |
except: | |
mediaurls= np.repeat([np.nan],len(df)) | |
linkedents = np.asarray([entitiesgetter(l) for l in df['entities'].values]).reshape(len(df),2) | |
dfsmall = pd.concat([pd.DataFrame({'screenname':names,'urls':urllist,'mediaurls':mediaurls}), | |
pd.DataFrame(linkedents,columns=['linkages','hashtags']),coorddf, | |
df[cols]],axis=1)#.assign(tweetid=df['id'], text=df['text']) | |
dfsmall.reset_index(drop=True,inplace=True) | |
dfsmall = dfsmall.assign(text=df.reset_index()['text'],tweetid=df.reset_index()['id'],language=df.reset_index()['lang']) | |
# set to datetime index | |
dfsmall.set_index(dates,inplace=True) | |
dfsmall.index.rename('UTC time',inplace=True) | |
dfsmall = dfsmall[['longitude', 'latitude','language','screenname','tweetid','linkages','text','hashtags','mediaurls', 'urls','type']+cols] | |
# get ride of mixed data types; enables h5 write | |
dfsmall = dfsmall.assign(linkages=[replace(l) for l in dfsmall['linkages'].values],hashtags=[replace(l) for l in dfsmall['hashtags'].values]) | |
del df | |
if anonymize: | |
dfsmall.drop(['screenname','linkages'],axis=1,inplace=True) | |
try: | |
return dfsmall | |
except: | |
print(filename) | |
# Author | |
# Linwood Creekmore III | |
# April 8 2017 | |
# heavy input from http://socialmedia-class.org/twittertutorial.html | |
# [email protected] | |
from twitter import Twitter, OAuth, TwitterHTTPError, TwitterStream | |
import pandas as pd | |
import datetime | |
import warnings | |
import time | |
import os | |
try: | |
import json | |
except ImportError: | |
import simplejson as json | |
def twitterpull(accesstok,accesssec,consumkey,consumsec, limit=20, rowlimit=10,arrow=True): | |
""" | |
Function to pull geolocated tweets and write files to disk. To get Twitter | |
API credentials, use the instructions here https://twittercommunity.com/t/how-to-get-my-api-key/7033 | |
Parameters | |
---------- | |
accesstok: str | |
Access token for Twitter API | |
accesssec: str | |
Access secret for Twitter API | |
consumkey: str | |
Consumer key for Twitter API | |
consumsec: str | |
Consumer secrety for Twitter API | |
limit: int | |
Maximum number of geolocated tweets to return in session. Function will stop either when this | |
number is reached or when Twitter's API limit is reached. | |
rowlimit: int | |
Maxium number of tweets to have in one file saved to disk. This is arbitrary and depends on | |
on whether you want lots of files on disk with Twitter messages or several large files. | |
arrow: bool | |
Whether to use pyarrow, which provide efficient columnar data storage format. Can significantly | |
reduce I/O overhead. When False, defaults to write a json to disk. Also, if true and pyarrow | |
is not installed, reverts to json | |
Returns | |
------- | |
obj: | |
Writes to disk | |
""" | |
# switch to json and warn if pyarrow not installed | |
try: | |
import pyarrow.parquet as pq | |
import pyarrow as pa | |
except: | |
arrow=False | |
warnings.warn('pyarrow not installed, output file written as json') | |
####################################################################### | |
# Pass Twitter App keys into Twitter client for Python | |
# Get your Twitter API key using these instructions: | |
# https://twittercommunity.com/t/how-to-get-my-api-key/7033 | |
####################################################################### | |
# Variables that contains the user credentials to access Twitter API | |
# If you have not followed the obfuscate process above, this will be empty and error out | |
ACCESS_TOKEN = accesstok | |
ACCESS_SECRET = accesssec | |
CONSUMER_KEY = consumkey | |
CONSUMER_SECRET = consumsec | |
try: | |
oauth = OAuth(ACCESS_TOKEN, ACCESS_SECRET, CONSUMER_KEY, CONSUMER_SECRET) | |
except ValueError: | |
raise('Your Twitter API credentials were not entered correctly. Please check and try again') | |
################################################## | |
# Pull the Twitter Data | |
################################################## | |
# Initiate the connection to Twitter Streaming API | |
twitter_stream = TwitterStream(auth=oauth) | |
# Filter the public data following through Twitter; the format is "long,lat , long,lat" with southwest corner first | |
iterator = twitter_stream.statuses.filter(locations = '-180, -90, 180,90 ') | |
# Print each tweet in the stream to the screen | |
# Here we set it to stop after getting 1000 tweets. | |
# You don't have to set it to stop, but can continue running | |
# the Twitter API to collect data for days or even longer. | |
count = 0 | |
filecount = 0 | |
limit = limit | |
rowlimit = rowlimit | |
tweets=[] # empty list to hold returned tweets | |
# use date and time for file name | |
for tweet in iterator: | |
if count >= limit: | |
break | |
else: | |
if 'coordinates' in tweet.keys(): | |
if tweet['coordinates'] != None: | |
tweets.append(tweet) | |
count +=1 | |
filecount +=1 | |
if filecount >= rowlimit: | |
date = datetime.datetime.now().strftime("%d%b%Y_%H%M%S%f") | |
df = reader(tweets) | |
if arrow: | |
table = pa.Table.from_pandas(df) | |
pq.write_table(table, './Tweets_{0}.parquet'.format(date)) | |
else: | |
df.to_json('./Tweets_{0}.txt'.format(date),orient='records') | |
time.sleep(.05) | |
filecount = 0 | |
tweets = [] | |
else: | |
pass | |
if __name__ == "__main__": | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example use: