Last active
April 11, 2017 11:12
-
-
Save linwoodc3/14e94fb3c9e3b152dd204a42da01ae90 to your computer and use it in GitHub Desktop.
Utility functions for District Data Labs blog on geolocated social media. These functions clean and prepare Twitter data for geospation analysis, in addition to removing personally identifying information.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: | |
# Linwood Creekmore III | |
# email: [email protected] | |
# Custom functions for Twitter Geospatial Analysis Blog | |
################################# | |
# Class for corrupt json | |
################################# | |
import pandas as pd | |
import io, json, re, itertools | |
from tzwhere import tzwhere | |
from shapely.geometry import Point | |
import pytz | |
try: | |
from shapely.geometry import Point | |
tz1 = tzwhere.tzwhere(shapely=True,forceTZ=True) | |
except: | |
tz2 = tzwhere.tzwhere() | |
#shameless copy paste from json/decoder.py | |
FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL | |
WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS) | |
class ConcatJSONDecoder(json.JSONDecoder): | |
def decode(self, s, _w=WHITESPACE.match): | |
s_len = len(s) | |
objs = [] | |
end = 0 | |
while end != s_len: | |
obj, end = self.raw_decode(s, idx=_w(s, end).end()) | |
end = _w(s, end).end() | |
objs.append(obj) | |
return objs | |
################################# | |
# Converts tweet json into tidy data | |
################################# | |
def reader(filename,anonymize=True): | |
import pandas as pd | |
"""Transform tweet json data into tidy format. | |
This function transforms tweets into tidy format. | |
This has been tested and workds on Assumes you are retrieving | |
tweets from the Twitter API. *Requires pandas library.* | |
Parameters | |
---------- | |
filename : str | |
string representing path to twitter data in json format. | |
Returns | |
------- | |
dataframe | |
Pandas dataframe in tidy format. | |
""" | |
try: | |
df = pd.read_json(filename, convert_dates=True) # try to read in normally | |
"contributers" in df.columns # look for column | |
except: | |
# trying to read in jsons with "trailing data" | |
with open(filename,'rb') as f: | |
g=io.StringIO(f.read().decode('utf-8')) | |
h = json.load(g,cls=ConcatJSONDecoder) | |
if len(h)>1 and len(h)<1000: | |
try: | |
df = pd.DataFrame(list(itertools.chain(*h))) | |
df = df.assign(created_at=pd.to_datetime(df.created_at)) | |
except: | |
df = pd.DataFrame(h) | |
df = df.assign(created_at=pd.to_datetime(df.created_at)) | |
else: | |
# trying to read in jsons that return as two lists of dicts | |
df = pd.read_json(io.StringIO(json.dumps(h)),convert_dates=True) | |
try: | |
# reading another dirty version | |
df = df.apply(lambda x: pd.Series(json.loads(x[0])),axis=1) | |
except: | |
# return to original if nothing workds | |
df = df | |
df.dropna(subset=['coordinates', 'created_at'], how='all',inplace=True) # drop rows with NA | |
df.reset_index(inplace=True,drop=True) # reset the index | |
df.set_index('created_at', drop=True,inplace=True) | |
dfsmall = df[[u'coordinates', u'lang', u'text']].assign( | |
screenname = df.apply( | |
lambda x: x['user']['screen_name'], axis=1), | |
urls=df.apply( | |
lambda x: x['entities']['urls'][0]['expanded_url'] \ | |
if len(x['entities']['urls']) != 0 else None, | |
axis=1), | |
tweetid = df.apply(lambda x: x['id'], axis=1), | |
latitude = df.apply( | |
lambda x: (x['coordinates']['coordinates'][1]\ | |
if isinstance(x['coordinates'], dict) == True else None), | |
axis=1), | |
longitude = df.apply( | |
lambda x: (x['coordinates']['coordinates'][0]\ | |
if isinstance(x['coordinates'], dict) == True else None), | |
axis=1), | |
type=df.apply( | |
lambda x: (x['coordinates']['type']\ | |
if isinstance(x['coordinates'], dict) == True else None), | |
axis=1)).drop( | |
labels='coordinates', axis=1) | |
del df | |
if anonymize: | |
dfsmall.drop(['screenname','text','tweetid','urls'],axis=1,inplace=True) | |
return dfsmall | |
################################# | |
# Highlights dataframe values | |
################################# | |
def highlight_max(s): | |
''' | |
highlight the maximum in a Series yellow. | |
''' | |
top10=['China','United States','India','Russia','Indonesia','Japan','China','Brazil','United Kingdom','Mexico','Russia','Phillipines','Spain'] | |
is_max = s == s.max() | |
return ['background-color: yellow' if v else '' for v in is_max] | |
from collections import OrderedDict | |
################################# | |
# Identifies primary language | |
# using custom rules/stats | |
################################# | |
def languageweighter(row): | |
"""Normalizes languages to identify true primary langauge. | |
Weighting function to identify the true primary language | |
of a region. Uses rule to identify a count of occurences | |
per country. If the highest proportional language is | |
English and a second langauge contributes 10% or more, this | |
function weights the second language as the primary, and reverses | |
the proportion to a 25/75 split, where the second language will | |
account for larger proportion. | |
Parameters | |
---------- | |
row : pandas row | |
performs weighting on pandas row in vectorized operations | |
Returns | |
------- | |
row | |
row with weighted/transformed language proportions. | |
""" | |
ratio = row.sort_values(ascending=False)/\ | |
row.sort_values(ascending=False).sum() | |
old0,old1,old2 = ratio[0],ratio[1],ratio[2] | |
try: | |
if ratio.index[0]=='en' and ratio[1]>=0.10: | |
ratio[0]=((old0+old1)*.25) | |
ratio[1]=((old0+old1)*.75) | |
elif ratio.index[0]=='en' & ratio[1]>0.10 and ratio[2]>0.03: | |
ratio[0]=((old0+old1)*.05) | |
ratio[1]=((old0+old1)*.65) | |
ratio[2]=((old0+old1)*.25) | |
except: | |
ratio=ratio | |
return ratio | |
################################# | |
# Creates shapely points | |
################################# | |
def shaper(row): | |
""" | |
Parallel function to create shapely points | |
from latitude/longitude pair in dataframe | |
Parameters | |
---------- | |
row : pandas or dask dataframe row | |
Row containing latitude and longitude variables and data | |
Returns | |
------- | |
shapely point | |
Shapely spatial object for geoprocessing in Python. | |
""" | |
geometry=Point(row['longitude'],row['latitude']) | |
return geometry | |
def show_function(function): | |
""" | |
Utility function to load function into | |
jupyter notebook for review. | |
Parameters | |
---------- | |
function : function that is active/imported | |
Function must be imported to work | |
Returns | |
------- | |
function string | |
Loads full function logic into ipython cell. | |
""" | |
import inspect | |
get_ipython().set_next_input("".join(inspect.getsourcelines(function)[0])) | |
def timenormalize(frame,date_column='created_at'): | |
""" | |
Function to take column of dataframe dates and | |
normalize to local time | |
Parameters | |
---------- | |
frame : pandas, geopandas or dask dataframe | |
Frame with a datetime column | |
date_column : string | |
String name of the column with the datetime objects | |
Returns | |
------- | |
Series | |
Series/column with normalized datetime objects at | |
local time | |
""" | |
# convert to datetime object with UTC time tag | |
now_aware = pytz.utc.localize(frame[date_column].to_pydatetime()) | |
# get the timezone string representation using lat/lon pair | |
try: | |
timezone_str=tz1.tzNameAt(frame['latitude'],frame['longitude'],forceTZ=True) | |
except: | |
try: | |
timezone_str=tz2.tzNameAt(frame['latitude'],frame['longitude'],forceTZ=True) | |
except Exception as e: | |
raise('You need to install the pytzwhere module. {0}'.format(e)) | |
# get the time offset | |
timezone = pytz.timezone(timezone_str) | |
# convert UTC to calculated local time | |
aware = now_aware.astimezone(timezone) | |
return aware | |
def call_apply_fn(df,function=None): | |
"""Small utility function to apply parallel functions to dataframe | |
""" | |
if function: | |
return df.apply(function, axis=1) | |
else: | |
raise('You need to enter a function to vectorize') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment