Last active
May 16, 2023 15:14
-
-
Save bluemyria/fd4851c82d866f5305418a6cb33a23b1 to your computer and use it in GitHub Desktop.
CGP Dev 5. Developing a Backend Service - Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from flask import Response | |
""" | |
Import shared GCP helper modules | |
""" | |
# Add pubsub to import list | |
from quiz.gcp import datastore, pubsub | |
""" | |
Gets list of questions from datastore | |
- Create query | |
- Filter on quiz | |
- Call the datastore helper to get back JSON | |
- Pretty print JSON | |
- Set header and return the response | |
""" | |
def get_questions(quiz_name): | |
questions = datastore.list_entities(quiz_name) | |
payload = {'questions': list(questions)} | |
payload = json.dumps(payload, indent=2, sort_keys=True) | |
response = Response(payload) | |
response.headers['Content-Type'] = 'application/json' | |
return response | |
""" | |
Grades submitted answers | |
- Get list of questions with correct answers from datastore | |
- Iterate through questions, find any submitted answers that match | |
- Count total number of questions for which there is >0 correct answers | |
- Compose and pretty print payload | |
- Compose and return response | |
""" | |
def get_grade(quiz_name, answers): | |
questions = datastore.list_entities(quiz_name, False) | |
score = len(list(filter(lambda x: x > 0, | |
list(map(lambda q: | |
len(list(filter(lambda answer: | |
answer['id'] == q['id'] and | |
int(answer['answer']) == q['correctAnswer'], | |
answers))) | |
, questions)) | |
))) | |
payload = {'correct': score, 'total': len(questions)} | |
payload = json.dumps(payload, indent=2, sort_keys=True) | |
response = Response(payload) | |
response.headers['Content-Type'] = 'application/json' | |
return response | |
""" | |
Publish feedback | |
- Call pubsub helper | |
- Compose and return response | |
""" | |
def publish_feedback(feedback): | |
# Publish the feedback using your pubsub module, return the result | |
result = pubsub.publish_feedback(feedback) | |
response = Response(json.dumps(result, indent=2, sort_keys=True)) | |
response.headers['Content-Type'] = 'application/json' | |
return response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Create a Cloud Pub/Sub topic | |
# In the Cloud Platform Console, click Navigation menu > Pub/Sub > Topics. | |
# Create a topic., eg "feedback" | |
gcloud pubsub subscriptions create worker-subscription --topic feedback | |
#Publish a message to a Cloud Pub/Sub topic | |
gcloud pubsub topics publish feedback --message "Hello World" | |
#Retrieve a message from a Cloud Pub/Sub subscription | |
gcloud beta pubsub subscriptions pull worker-subscription --auto-ack | |
#OR | |
gcloud pubsub subscriptions pull worker-subscription --auto-ack | |
# Create a Cloud Spanner instance | |
# On the Instance Details page for quiz-instance => Create database (eg. "quiz-database") | |
# Define your database schema => Edit as text. | |
# DDL statements, type the following SQL statement: | |
CREATE TABLE Feedback ( | |
feedbackId STRING(100) NOT NULL, | |
email STRING(100), | |
quiz STRING(20), | |
feedback STRING(MAX), | |
rating INT64, | |
score FLOAT64, | |
timestamp INT64 | |
) | |
PRIMARY KEY (feedbackId); | |
# After running the quiz... | |
# Navigation menu > Spanner > Select quiz-instance > quiz-database > Query. | |
SELECT * FROM Feedback |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import the language module | |
from google.cloud import language | |
# Import enums and types | |
from google.cloud.language import enums | |
from google.cloud.language import types | |
# Create the Language API client | |
lang_client = language.LanguageServiceClient() | |
""" | |
Returns sentiment analysis score | |
- create document from passed text | |
- do sentiment analysis using natural language applicable | |
- return the sentiment score | |
""" | |
def analyze(text): | |
# Create a Document object | |
doc = types.Document(content=text, type=enums.Document.Type.PLAIN_TEXT) | |
# Analyze the sentiment | |
sentiment = lang_client.analyze_sentiment(document=doc).document_sentiment | |
# Return the sentiment score | |
return sentiment.score |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import os | |
project_id = os.getenv('GCLOUD_PROJECT') | |
# Load the Cloud Pub/Sub module | |
from google.cloud import pubsub_v1 | |
from flask import current_app | |
# Create a Pub/Sub Publisher Client | |
publisher = pubsub_v1.PublisherClient() | |
# Create a Pub/Sub Subscriber Client | |
sub_client = pubsub_v1.SubscriberClient() | |
# Create a Topic Object to reference the feedback topic | |
topic_path = publisher.topic_path(project_id, 'feedback') | |
# Create a Subscription object named worker-subscription | |
sub_path = sub_client.subscription_path(project_id, 'worker-subscription') | |
""" | |
Publishes feedback info | |
- jsonify feedback object | |
- encode as bytestring | |
- publish message | |
- return result | |
""" | |
def publish_feedback(feedback): | |
# Publish the feedback object to the feedback topic | |
payload = json.dumps(feedback, indent=2, sort_keys=True) | |
data = payload.encode('utf-8') | |
future = publisher.publish(topic_path, data=data) | |
return future.result() | |
"""pull_feedback | |
Starts pulling messages from subscription | |
- receive callback function from calling module | |
- initiate the pull providing the callback function | |
""" | |
def pull_feedback(callback): | |
# Subscriber to the worker-subscription, | |
# invoking the callback | |
sub_client.subscribe(sub_path, callback=callback) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
# Import the spanner module | |
from google.cloud import spanner | |
""" | |
Get spanner management objects | |
""" | |
# Create a spanner Client | |
spanner_client = spanner.Client() | |
# Get a reference to the Cloud Spanner quiz-instance | |
instance = spanner_client.instance('quiz-instance') | |
# Get a referent to the Cloud Spanner quiz-database | |
database = instance.database('quiz-database') | |
""" | |
Takes an email address and reverses it (to be used as primary key) | |
""" | |
def reverse_email(email): | |
return '_'.join(list(reversed(email.replace('@','_'). | |
replace('.','_'). | |
split('_')))) | |
""" | |
Persists feedback data into Spanner | |
- create primary key value | |
- do a batch insert (even though it's a single record) | |
""" | |
def save_feedback(data): | |
# Create a batch object for database operations | |
with database.batch() as batch: | |
# Create a key for the record | |
# from the email, quiz and timestamp | |
feedback_id = '{}_{}_{}'.format(reverse_email(data['email']), | |
data['quiz'], | |
data['timestamp']) | |
# Use the batch to insert a record | |
# into the feedback table | |
# This needs the columns and values | |
batch.insert( | |
table='feedback', | |
columns=( | |
'feedbackId', | |
'email', | |
'quiz', | |
'timestamp', | |
'rating', | |
'score', | |
'feedback' | |
), | |
values=[ | |
( | |
feedback_id, | |
data['email'], | |
data['quiz'], | |
data['timestamp'], | |
data['rating'], | |
data['score'], | |
data['feedback'] | |
) | |
] | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import sys | |
import time | |
import json | |
# Load the pubsub, languageapi and spanner modules from the quiz.gcp package | |
from quiz.gcp import pubsub, languageapi, spanner | |
""" | |
Configure logging | |
""" | |
logging.basicConfig(stream=sys.stdout, level=logging.INFO) | |
log = logging.getLogger() | |
""" | |
Receives pulled messages, analyzes and stores them | |
- Acknowledge the message | |
- Log receipt and contents | |
- convert json string | |
- call helper module to do sentiment analysis | |
- log sentiment score | |
- call helper module to persist to spanner | |
- log feedback saved | |
""" | |
def pubsub_callback(message): | |
# Acknowledge the message | |
message.ack() | |
log.info('Message received') | |
log.info(message) | |
data = json.loads(message.data) | |
# Use the languageapi module to analyze the sentiment | |
score = languageapi.analyze(str(data['feedback'])) | |
# Log the sentiment score | |
log.info('Score: {}'.format(score)) | |
# Assign the sentiment score to a new score property | |
data['score'] = score | |
# Use the spanner module to save the feedback | |
spanner.save_feedback(data) | |
# Log a message to say the feedback has been saved | |
log.info('Feedback saved') | |
""" | |
Pulls messages and loops forever while waiting | |
- initiate pull | |
- loop once a minute, forever | |
""" | |
def main(): | |
log.info('Worker starting...') | |
# Register the callback | |
pubsub.pull_feedback(pubsub_callback) | |
while True: | |
time.sleep(60) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment