Last active
March 9, 2017 23:09
-
-
Save naoko/f992055d38c4707137c31a49e991fe50 to your computer and use it in GitHub Desktop.
Example of how to use Google Pub/Sub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from google.cloud import pubsub | |
logger = logging.getLogger(__name__) | |
def create_topic(topic_name): | |
client = pubsub.Client() | |
topic = client.topic(topic_name) | |
if not topic.exists(): | |
logger.info("Creating topic {}".format(topic_name)) | |
topic.create(topic_name) | |
def create_subscription(topic_name, subscription_name): | |
client = pubsub.Client() | |
topic = client.topic(topic_name) | |
subscription = topic.subscription(subscription_name) | |
if not subscription.exists(): | |
subscription.create() | |
def publish_message(topic_name, data): | |
"""Publishes a message to a Pub/Sub topic with the given data.""" | |
client = pubsub.Client() | |
topic = client.topic(topic_name) | |
# Data must be a bytestring | |
data = data.encode('utf-8') | |
message_id = topic.publish(data) | |
logger.info('Message {} published.'.format(message_id)) | |
def receive_message(topic_name, subscription_name): | |
"""Receives a message from a pull subscription. | |
`subscription.pull` will return generator of tuples(ack_id, message) | |
message.message_id: An ID assigned to the message by the API | |
message.data: The payload of the message. | |
message.attributes: Extra metadata associated by the publisher with the message. | |
:param topic_name: Topic name defined in Google Pub/Sub | |
:param subscription_name: Subscription name | |
:return: subscription_obj, List(Tuple(ack_id, message)) | |
""" | |
client = pubsub.Client() | |
topic = client.topic(topic_name) | |
# maximum time after a subscriber receives a message before the subscriber should acknowledge the message | |
# default is 10 seconds | |
# The maximum custom deadline you can specify is 600 seconds (10 minutes) | |
subscription = topic.subscription(subscription_name, ack_deadline=600) | |
# Change return_immediately=False to block until messages are received. | |
results = subscription.pull(return_immediately=False, max_messages=10) | |
logging.info('Received {} messages.'.format(len(results))) | |
return subscription, results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment