Skip to content

Instantly share code, notes, and snippets.

@linwoodc3
Last active January 19, 2021 22:58
Show Gist options
  • Save linwoodc3/d2434e36756524a2d4abdf6d230904be to your computer and use it in GitHub Desktop.
Save linwoodc3/d2434e36756524a2d4abdf6d230904be to your computer and use it in GitHub Desktop.
Python script that uses the python Twitter client (https://github.com/sixohsix/twitter) to pull tweets that are geolocated. Optionally stores in efficient columnar parquet data store with configurable file sizes. Took 13 secs to download 100 geolocated tweets on MacOS 10.12 with 16 GB RAM on 82 Mb/s connection.
# Author
# Linwood Creekmore III
# April 8 2017
# heavy input from http://socialmedia-class.org/twittertutorial.html
# [email protected]
import re
import copy
import numpy as np
import pandas as pd
import json
try:
import ujson
except:
import json as ujson
from dateutil.parser import parse
#shameless copy paste from json/decoder.py
FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL
WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS)
from json import JSONEncoder
class MyEncoder(JSONEncoder):
def default(self, o):
return o.__dict__
class ConcatJSONDecoder(json.JSONDecoder):
def decode(self, s, _w=WHITESPACE.match):
s_len = len(s)
objs = []
end = 0
while end != s_len:
obj, end = self.raw_decode(s, idx=_w(s, end).end())
end = _w(s, end).end()
objs.append(obj)
return objs
def screens(x):
return x['screen_name']
# get urls
def urls(x):
if len(x['urls']) != 0:
return x['urls'][0]['expanded_url']
else:
return None
def media(x):
"""Media urls"""
if isinstance(x,dict):
return(x['media'][0]['media_url'])
else:
return(None)
# get urls
def coordsgetter(x):
if x:
longitude = np.float(x['coordinates'][0])
latitude = np.float(x['coordinates'][1])
tp = x['type']
else:
return np.asarray([None,None,None]).reshape(1,3)
return np.asarray([longitude,latitude,tp]).reshape(1,3)
def entitiesgetter(x):
if x:
if len(x['user_mentions'])>0:
temp = x['user_mentions']
links=[]
valid_user=['id','screen_name','name']
for l in temp:
holder = (copy.deepcopy(l))
holder.pop('id_str')
holder.pop('indices')
links.append(holder)
#
links = ujson.dumps({'linkedAccounts':list(links)},cls=MyEncoder)
else:
links=np.asarray([None])
if len(x['hashtags'])>0:
hashes = []
for l in x['hashtags']:
hashes.append("#"+l['text'])
hashes = ujson.dumps({'hashtags':(hashes)},cls=MyEncoder)
else:
hashes=np.asarray([None])
try:
return np.asarray([links,hashes,]).reshape(1,2)
except:
return np.asarray([None,None]).reshape(1,2)
def media(x):
"""Media urls"""
if isinstance(x,dict):
return(x['media'][0]['media_url'])
else:
return(None)
def jsget(x):
"Break list of json dicts into pandas dataframe"
js = ujson.loads(x)
return pd.Series(js)
def replace(x):
if type(x) == np.ndarray or x == None:
return np.nan
else:
return x
#################################
# Converts tweet json into tidy data
#################################
def reader(filename,anonymize=True):
"""Transform tweet json data into tidy format.
This function transforms tweets into tidy format.
This has been tested and workds on Assumes you are retrieving
tweets from the Twitter API. *Requires pandas library.*
Parameters
----------
filename : str
string representing path to twitter data in json format.
Returns
-------
dataframe
Pandas dataframe in tidy format.
"""
try:
df = pd.DataFrame(filename)
except:
try:
df = pd.read_json(filename) # try to read in normally
if len(df.columns)<10:# look for column
raise ValueError('Not here')
except:
js = json.load(open(filename),cls=ConcatJSONDecoder)
dd = pd.DataFrame(js)
if 'contributors' in dd.columns:
df = dd
elif len(dd.columns) ==1:
jsvect = np.vectorize(jsget)
jsVects = jsvect(js)
df = pd.concat(jsVects,axis=1).T
else:
arr = dd.values.flatten()[:,np.newaxis]
df = pd.DataFrame(arr[np.where(arr,True,False)].tolist())
try:
df.dropna(subset=['coordinates', 'created_at'], how='all',inplace=True) # drop rows with NA
df.reset_index(inplace=True,drop=True) # reset the index
parsevec = np.vectorize(parse)
except:
print(df.columns)
try:
dates = parsevec(df.created_at.values)
except:
dates = df.created_at.values
cols = []
for c in ['favorited_count','retweet_count','reply_count', 'quote_count','in_reply_to_screen_name','in_reply_to_user_id']:
if c in df.columns:
cols.append(c)
namevect = np.vectorize(screens)
names = pd.Series(namevect(df['user'].values))
urlvect = np.vectorize(urls)
urllist = urlvect(df['entities'].values)
coordvect = np.vectorize(coordsgetter)
try:
coorddf = pd.DataFrame(np.concatenate(coordvect(df['coordinates'].values)),columns=['longitude','latitude','type'])
except:
coords = [coordsgetter(l) for l in df.coordinates.values]
coorddf = pd.DataFrame(np.asarray(coords).reshape(len(coords),3))
coorddf.columns = ['longitude','latitude','type',]
mediavect = np.vectorize(media)
try:
mediaurls = mediavect(df['extended_entities'].values)
except:
mediaurls= np.repeat([np.nan],len(df))
linkedents = np.asarray([entitiesgetter(l) for l in df['entities'].values]).reshape(len(df),2)
dfsmall = pd.concat([pd.DataFrame({'screenname':names,'urls':urllist,'mediaurls':mediaurls}),
pd.DataFrame(linkedents,columns=['linkages','hashtags']),coorddf,
df[cols]],axis=1)#.assign(tweetid=df['id'], text=df['text'])
dfsmall.reset_index(drop=True,inplace=True)
dfsmall = dfsmall.assign(text=df.reset_index()['text'],tweetid=df.reset_index()['id'],language=df.reset_index()['lang'])
# set to datetime index
dfsmall.set_index(dates,inplace=True)
dfsmall.index.rename('UTC time',inplace=True)
dfsmall = dfsmall[['longitude', 'latitude','language','screenname','tweetid','linkages','text','hashtags','mediaurls', 'urls','type']+cols]
# get ride of mixed data types; enables h5 write
dfsmall = dfsmall.assign(linkages=[replace(l) for l in dfsmall['linkages'].values],hashtags=[replace(l) for l in dfsmall['hashtags'].values])
del df
if anonymize:
dfsmall.drop(['screenname','linkages'],axis=1,inplace=True)
try:
return dfsmall
except:
print(filename)
# Author
# Linwood Creekmore III
# April 8 2017
# heavy input from http://socialmedia-class.org/twittertutorial.html
# [email protected]
from twitter import Twitter, OAuth, TwitterHTTPError, TwitterStream
import pandas as pd
import datetime
import warnings
import time
import os
try:
import json
except ImportError:
import simplejson as json
def twitterpull(accesstok,accesssec,consumkey,consumsec, limit=20, rowlimit=10,arrow=True):
"""
Function to pull geolocated tweets and write files to disk. To get Twitter
API credentials, use the instructions here https://twittercommunity.com/t/how-to-get-my-api-key/7033
Parameters
----------
accesstok: str
Access token for Twitter API
accesssec: str
Access secret for Twitter API
consumkey: str
Consumer key for Twitter API
consumsec: str
Consumer secrety for Twitter API
limit: int
Maximum number of geolocated tweets to return in session. Function will stop either when this
number is reached or when Twitter's API limit is reached.
rowlimit: int
Maxium number of tweets to have in one file saved to disk. This is arbitrary and depends on
on whether you want lots of files on disk with Twitter messages or several large files.
arrow: bool
Whether to use pyarrow, which provide efficient columnar data storage format. Can significantly
reduce I/O overhead. When False, defaults to write a json to disk. Also, if true and pyarrow
is not installed, reverts to json
Returns
-------
obj:
Writes to disk
"""
# switch to json and warn if pyarrow not installed
try:
import pyarrow.parquet as pq
import pyarrow as pa
except:
arrow=False
warnings.warn('pyarrow not installed, output file written as json')
#######################################################################
# Pass Twitter App keys into Twitter client for Python
# Get your Twitter API key using these instructions:
# https://twittercommunity.com/t/how-to-get-my-api-key/7033
#######################################################################
# Variables that contains the user credentials to access Twitter API
# If you have not followed the obfuscate process above, this will be empty and error out
ACCESS_TOKEN = accesstok
ACCESS_SECRET = accesssec
CONSUMER_KEY = consumkey
CONSUMER_SECRET = consumsec
try:
oauth = OAuth(ACCESS_TOKEN, ACCESS_SECRET, CONSUMER_KEY, CONSUMER_SECRET)
except ValueError:
raise('Your Twitter API credentials were not entered correctly. Please check and try again')
##################################################
# Pull the Twitter Data
##################################################
# Initiate the connection to Twitter Streaming API
twitter_stream = TwitterStream(auth=oauth)
# Filter the public data following through Twitter; the format is "long,lat , long,lat" with southwest corner first
iterator = twitter_stream.statuses.filter(locations = '-180, -90, 180,90 ')
# Print each tweet in the stream to the screen
# Here we set it to stop after getting 1000 tweets.
# You don't have to set it to stop, but can continue running
# the Twitter API to collect data for days or even longer.
count = 0
filecount = 0
limit = limit
rowlimit = rowlimit
tweets=[] # empty list to hold returned tweets
# use date and time for file name
for tweet in iterator:
if count >= limit:
break
else:
if 'coordinates' in tweet.keys():
if tweet['coordinates'] != None:
tweets.append(tweet)
count +=1
filecount +=1
if filecount >= rowlimit:
date = datetime.datetime.now().strftime("%d%b%Y_%H%M%S%f")
df = reader(tweets)
if arrow:
table = pa.Table.from_pandas(df)
pq.write_table(table, './Tweets_{0}.parquet'.format(date))
else:
df.to_json('./Tweets_{0}.txt'.format(date),orient='records')
time.sleep(.05)
filecount = 0
tweets = []
else:
pass
if __name__ == "__main__":
pass
@linwoodc3
Copy link
Author

Example use:

from cleantweets import reader,twitterpull
import json

# stored my keys in a json on disk
a = json.load(open('/Users/linwood/Desktop/keysforapps/apikeys.txt'))

#twitter credentials
tok = a['twitter']['accesstoken']
sec = a['twitter']['accesssecret']
key = a['twitter']['consumerkey']
csec = a['twitter']['consumersecret']

# run function
twitterpull(accesstok=tok,
            accesssec=sec,
            consumkey=key,
            consumsec=csec,
           limit=5,
           rowlimit=2,
           arrow=True)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment