Created
January 31, 2020 15:28
-
-
Save AndreiD/233d988852a58c9736854ee0f9d9cb4a to your computer and use it in GitHub Desktop.
google_pub_sub_python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import os | |
import apache_beam as beam | |
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions | |
from google.cloud import pubsub_v1 | |
# imports the credential file | |
path_service_account = "read_stream_key.json" | |
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account | |
# set the subscription id | |
subscriptionID = 'projects/beamplay/subscriptions/highTechHospitalSubX0001' | |
subscriber = pubsub_v1.SubscriberClient() | |
class FormatStreamData(beam.DoFn): | |
def process(self, element): | |
data = json.loads(element) | |
print("{} {} has {} beats per minute".format(data["patient"]["last_name"], data["patient"]["first_name"], | |
data["bpm"])) | |
yield data | |
def run(): | |
options = PipelineOptions() | |
options.view_as(StandardOptions).streaming = True | |
options.view_as(StandardOptions).runner = 'DirectRunner' | |
with beam.Pipeline(options=options) as p: | |
( | |
p | |
| 'Read from pub sub' >> beam.io.ReadFromPubSub(subscription=subscriptionID) | |
| 'Log results' >> beam.ParDo(FormatStreamData()) | |
) | |
if __name__ == '__main__': | |
logging.getLogger().setLevel(logging.INFO) | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment