Skip to content

Instantly share code, notes, and snippets.

@michael-erasmus
Last active August 26, 2020 18:11
Show Gist options
  • Save michael-erasmus/cc0f1e3d70707016c623dd9ac67ea521 to your computer and use it in GitHub Desktop.
Save michael-erasmus/cc0f1e3d70707016c623dd9ac67ea521 to your computer and use it in GitHub Desktop.
Engage comment GNL API Labeller
cachetools==4.1.1
certifi==2020.6.20
chardet==3.0.4
google-api-core==1.21.0
google-auth==1.19.2
google-auth-oauthlib==0.4.1
google-cloud-bigquery==1.25.0
google-cloud-core==1.3.0
google-cloud-language==1.3.0
google-resumable-media==0.5.1
googleapis-common-protos==1.52.0
grpcio==1.30.0
idna==2.10
numpy==1.19.0
oauthlib==3.1.0
pandas==1.0.5
pandas-gbq==0.13.2
protobuf==3.12.2
pyasn1==0.4.8
pyasn1-modules==0.2.8
pydata-google-auth==1.1.0
python-dateutil==2.8.1
pytz==2020.1
requests==2.24.0
requests-oauthlib==1.3.0
retrying==1.3.3
rsa==4.6
six==1.15.0
tqdm==4.48.0
urllib3==1.25.9
# -*- coding: utf-8 -*-
import pandas as pd
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
from google.api_core.exceptions import ResourceExhausted, InvalidArgument, InternalServerError
from retrying import retry
from tqdm import tqdm
client = language.LanguageServiceClient()
def is_resource_exhausted(exception):
return isinstance(exception, ResourceExhausted)
@retry(retry_on_exception=is_resource_exhausted, wait_fixed=10000, stop_max_attempt_number=10)
def classify_text(text):
document = types.Document(
content=text,
type=enums.Document.Type.PLAIN_TEXT)
try:
sentiment = client.analyze_sentiment(document=document).document_sentiment
except InvalidArgument: #most likely the language isn't supported
return None, None
except InternalServerError: #Weird unexpected error
print(f'Deserialization error on text {text}.')
return None, None
return sentiment.score, sentiment.magnitude
# #Read the data
query = "select * from dbt_buffer.engage_comments where id not in (select id from buffer_engage.comment_nl_api_sentiment)"
print('Reading comments.')
df = pd.read_gbq(query, project_id="buffer-data")
tqdm.pandas()
dfs = df # can sample here for testing
#break df in chunks of 500 and process each seperately
n = 500
list_dfs = [dfs[i:i+n] for i in range(0,dfs.shape[0],n)]
for i, chunk in enumerate(list_dfs):
print(f'Processing chunk {i+1} of {len(list_dfs)}.')
chunk['sentiment_score'], chunk['sentiment_magnitude'] = zip(*chunk.text.progress_apply(classify_text))
chunk = chunk[['id', 'text', 'sentiment_score', 'sentiment_magnitude']]
chunk['sentiment_score'] = chunk['sentiment_score'].astype(float)
chunk['sentiment_magnitude'] = chunk['sentiment_magnitude'].astype(float)
print('Writing chunk to BigQuery.')
chunk['created_at'] = pd.Timestamp.now()
chunk.to_gbq('buffer_engage.comment_nl_api_sentiment', project_id='buffer-data', if_exists='append')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment