Skip to content

Instantly share code, notes, and snippets.

@huangsam
Last active January 16, 2019 08:46
Show Gist options
  • Save huangsam/202ef7322a0157ea465ef4d1871b5ec6 to your computer and use it in GitHub Desktop.
Save huangsam/202ef7322a0157ea465ef4d1871b5ec6 to your computer and use it in GitHub Desktop.
Testing out Pub/Sub service on GCP
#!/bin/bash
export GOOGLE_CLOUD_PROJECT="<Project ID>"
export GOOGLE_APPLICATION_CREDENTIALS="<Credentials File Path>"
import os
import time
import random
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_name = 'projects/{project_id}/topics/{topic}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
topic='thermometer', # Set this to something appropriate.
)
# publisher.create_topic(topic_name)
def truthy(flag, good, bad):
return good if flag else bad
def callback(future):
message_id = future.result()
print(message_id + ' was sent...')
while True:
current_temp = str(random.uniform(10, 100))
current_unit = 'F'
success_flag = random.uniform(0, 1) > 0.05
message = 'Time measure: ' + truthy(success_flag, 'SUCCESS', 'FAILURE')
params = {
'topic': topic_name,
'data': bytes(message, encoding='utf-8'),
'cur_t': truthy(success_flag, current_temp, 'N/A'),
'cur_u': truthy(success_flag, current_unit, 'N/A'),
}
future = publisher.publish(**params)
future.add_done_callback(callback)
time.sleep(0.4)
google-cloud-pubsub==0.39.1
import os
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
topic_name = 'projects/{project_id}/topics/{topic}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
topic='thermometer', # Set this to something appropriate.
)
subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
sub='measure', # Set this to something appropriate.
)
#subscriber.create_subscription(
# name=subscription_name, topic=topic_name)
def callback(message):
cur_t_msg = message.attributes.get('cur_t')
cur_u_msg = message.attributes.get('cur_u')
if cur_u_msg == 'F':
invert_temp = round((float(cur_t_msg) - 32) * 5 / 9, 2)
invert_unit = 'C'
elif cur_u_msg == 'C':
invert_temp = round((float(cur_t_msg) * 9 / 5) + 32, 2)
invert_unit = 'F'
else:
invert_temp = 'Unknown'
invert_unit = 'N/A'
content = ' | '.join(map(str,
[
message.message_id,
message.publish_time,
message.data.decode('utf-8'),
f'{invert_temp} measured in {invert_unit}',
]
))
print(content)
message.ack()
future = subscriber.subscribe(subscription_name, callback)
try:
future.result()
except KeyboardInterrupt:
future.cancel()
@huangsam
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment