Skip to content

Instantly share code, notes, and snippets.

@soaxelbrooke
Last active April 4, 2018 06:28
Show Gist options
  • Save soaxelbrooke/b5cb83cbdd37bff829ab74f85bba3a78 to your computer and use it in GitHub Desktop.
Save soaxelbrooke/b5cb83cbdd37bff829ab74f85bba3a78 to your computer and use it in GitHub Desktop.
Example of using a ThreadPoolExecutor with Google's PubSub Python library
def ensure_subscription(subscription_name: str, subscriber: Optional[pubsub_v1.SubscriberClient]=None):
subscriber = subscriber or pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('MY_GCP_PROJECT', subscription_name)
try:
return subscriber.get_subscription(subscription_path)
except GoogleAPICallError:
logging.warning(f'No subscription {subscription_path} found, creating...')
# ensure topic exists
topic_path = get_pubsub_topic_path(get_publisher('my-topic-name'))
return subscriber.create_subscription(subscription_path, topic_path)
def subscribe_to(subscription_name: str, callback: callable, executor: ThreadPoolExecutor=None) -> Future:
""" Convenience function for consuming a pubsub subscription """
policy_factory = functools.partial(thread.Policy, executor=executor)
subscriber = pubsub_v1.SubscriberClient(policy_class=policy_factory)
subscription = ensure_subscription(subscription_name, subscriber)
return subscriber.subscribe(subscription.name, callback).future
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment