Skip to content

Instantly share code, notes, and snippets.

@mieitza
Forked from bluemyria/GCP_Console.sh
Created May 16, 2023 15:14
Show Gist options
  • Save mieitza/9adefe1a99a6d3db3b0e80466f3ca43c to your computer and use it in GitHub Desktop.
Save mieitza/9adefe1a99a6d3db3b0e80466f3ca43c to your computer and use it in GitHub Desktop.
CGP Dev 5. Developing a Backend Service - Python
import json
from flask import Response
"""
Import shared GCP helper modules
"""
# Add pubsub to import list
from quiz.gcp import datastore, pubsub
"""
Gets list of questions from datastore
- Create query
- Filter on quiz
- Call the datastore helper to get back JSON
- Pretty print JSON
- Set header and return the response
"""
def get_questions(quiz_name):
questions = datastore.list_entities(quiz_name)
payload = {'questions': list(questions)}
payload = json.dumps(payload, indent=2, sort_keys=True)
response = Response(payload)
response.headers['Content-Type'] = 'application/json'
return response
"""
Grades submitted answers
- Get list of questions with correct answers from datastore
- Iterate through questions, find any submitted answers that match
- Count total number of questions for which there is >0 correct answers
- Compose and pretty print payload
- Compose and return response
"""
def get_grade(quiz_name, answers):
questions = datastore.list_entities(quiz_name, False)
score = len(list(filter(lambda x: x > 0,
list(map(lambda q:
len(list(filter(lambda answer:
answer['id'] == q['id'] and
int(answer['answer']) == q['correctAnswer'],
answers)))
, questions))
)))
payload = {'correct': score, 'total': len(questions)}
payload = json.dumps(payload, indent=2, sort_keys=True)
response = Response(payload)
response.headers['Content-Type'] = 'application/json'
return response
"""
Publish feedback
- Call pubsub helper
- Compose and return response
"""
def publish_feedback(feedback):
# Publish the feedback using your pubsub module, return the result
result = pubsub.publish_feedback(feedback)
response = Response(json.dumps(result, indent=2, sort_keys=True))
response.headers['Content-Type'] = 'application/json'
return response
# Create a Cloud Pub/Sub topic
# In the Cloud Platform Console, click Navigation menu > Pub/Sub > Topics.
# Create a topic., eg "feedback"
gcloud pubsub subscriptions create worker-subscription --topic feedback
#Publish a message to a Cloud Pub/Sub topic
gcloud pubsub topics publish feedback --message "Hello World"
#Retrieve a message from a Cloud Pub/Sub subscription
gcloud beta pubsub subscriptions pull worker-subscription --auto-ack
#OR
gcloud pubsub subscriptions pull worker-subscription --auto-ack
# Create a Cloud Spanner instance
# On the Instance Details page for quiz-instance => Create database (eg. "quiz-database")
# Define your database schema => Edit as text.
# DDL statements, type the following SQL statement:
CREATE TABLE Feedback (
feedbackId STRING(100) NOT NULL,
email STRING(100),
quiz STRING(20),
feedback STRING(MAX),
rating INT64,
score FLOAT64,
timestamp INT64
)
PRIMARY KEY (feedbackId);
# After running the quiz...
# Navigation menu > Spanner > Select quiz-instance > quiz-database > Query.
SELECT * FROM Feedback
# Import the language module
from google.cloud import language
# Import enums and types
from google.cloud.language import enums
from google.cloud.language import types
# Create the Language API client
lang_client = language.LanguageServiceClient()
"""
Returns sentiment analysis score
- create document from passed text
- do sentiment analysis using natural language applicable
- return the sentiment score
"""
def analyze(text):
# Create a Document object
doc = types.Document(content=text, type=enums.Document.Type.PLAIN_TEXT)
# Analyze the sentiment
sentiment = lang_client.analyze_sentiment(document=doc).document_sentiment
# Return the sentiment score
return sentiment.score
import json
import logging
import os
project_id = os.getenv('GCLOUD_PROJECT')
# Load the Cloud Pub/Sub module
from google.cloud import pubsub_v1
from flask import current_app
# Create a Pub/Sub Publisher Client
publisher = pubsub_v1.PublisherClient()
# Create a Pub/Sub Subscriber Client
sub_client = pubsub_v1.SubscriberClient()
# Create a Topic Object to reference the feedback topic
topic_path = publisher.topic_path(project_id, 'feedback')
# Create a Subscription object named worker-subscription
sub_path = sub_client.subscription_path(project_id, 'worker-subscription')
"""
Publishes feedback info
- jsonify feedback object
- encode as bytestring
- publish message
- return result
"""
def publish_feedback(feedback):
# Publish the feedback object to the feedback topic
payload = json.dumps(feedback, indent=2, sort_keys=True)
data = payload.encode('utf-8')
future = publisher.publish(topic_path, data=data)
return future.result()
"""pull_feedback
Starts pulling messages from subscription
- receive callback function from calling module
- initiate the pull providing the callback function
"""
def pull_feedback(callback):
# Subscriber to the worker-subscription,
# invoking the callback
sub_client.subscribe(sub_path, callback=callback)
import re
# Import the spanner module
from google.cloud import spanner
"""
Get spanner management objects
"""
# Create a spanner Client
spanner_client = spanner.Client()
# Get a reference to the Cloud Spanner quiz-instance
instance = spanner_client.instance('quiz-instance')
# Get a referent to the Cloud Spanner quiz-database
database = instance.database('quiz-database')
"""
Takes an email address and reverses it (to be used as primary key)
"""
def reverse_email(email):
return '_'.join(list(reversed(email.replace('@','_').
replace('.','_').
split('_'))))
"""
Persists feedback data into Spanner
- create primary key value
- do a batch insert (even though it's a single record)
"""
def save_feedback(data):
# Create a batch object for database operations
with database.batch() as batch:
# Create a key for the record
# from the email, quiz and timestamp
feedback_id = '{}_{}_{}'.format(reverse_email(data['email']),
data['quiz'],
data['timestamp'])
# Use the batch to insert a record
# into the feedback table
# This needs the columns and values
batch.insert(
table='feedback',
columns=(
'feedbackId',
'email',
'quiz',
'timestamp',
'rating',
'score',
'feedback'
),
values=[
(
feedback_id,
data['email'],
data['quiz'],
data['timestamp'],
data['rating'],
data['score'],
data['feedback']
)
]
)
import logging
import sys
import time
import json
# Load the pubsub, languageapi and spanner modules from the quiz.gcp package
from quiz.gcp import pubsub, languageapi, spanner
"""
Configure logging
"""
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
log = logging.getLogger()
"""
Receives pulled messages, analyzes and stores them
- Acknowledge the message
- Log receipt and contents
- convert json string
- call helper module to do sentiment analysis
- log sentiment score
- call helper module to persist to spanner
- log feedback saved
"""
def pubsub_callback(message):
# Acknowledge the message
message.ack()
log.info('Message received')
log.info(message)
data = json.loads(message.data)
# Use the languageapi module to analyze the sentiment
score = languageapi.analyze(str(data['feedback']))
# Log the sentiment score
log.info('Score: {}'.format(score))
# Assign the sentiment score to a new score property
data['score'] = score
# Use the spanner module to save the feedback
spanner.save_feedback(data)
# Log a message to say the feedback has been saved
log.info('Feedback saved')
"""
Pulls messages and loops forever while waiting
- initiate pull
- loop once a minute, forever
"""
def main():
log.info('Worker starting...')
# Register the callback
pubsub.pull_feedback(pubsub_callback)
while True:
time.sleep(60)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment