Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save alexamies/9592ccba1860f3db852919e789c007e1 to your computer and use it in GitHub Desktop.
Save alexamies/9592ccba1860f3db852919e789c007e1 to your computer and use it in GitHub Desktop.
Monitoring End-to-End Message Latency with Stackdriver

Monitoring End-to-End Message Latency with Stackdriver

Enable the PubSub, Stackdriver, and Kubernetes Engine APIs in the Cloud Console. You will need to enable Stackdriver premium for use of custom metrics.

Export the name of your project and zone to the shell environment

export PROJECT_ID=[YOUR PROJECT ID]
gcloud config set project $PROJECT_ID
ZONE=us-west1-a
gcloud config set compute/zone $ZONE

Create a service account in the console and download the key json file into the directory that you run these commands from. Assign the service account role Project > Owner when you create it.

export GOOGLE_APPLICATION_CREDENTIALS=???.json

To create the custom metrics build and run the program

docker build -f Dockerfile-setup -t monitoring_setup .
docker run -it --env PROJECT_ID=$PROJECT_ID \
  --env GOOGLE_APPLICATION_CREDENTIALS=$GOOGLE_APPLICATION_CREDENTIALS \
  monitoring_setup

Define the custom metrics:

python setup_sd.py

Build and deploy the sender in a container locally:

docker build -f Dockerfile-sender -t pubsubsender .
docker run -it --env PROJECT_ID=$PROJECT_ID \
  --env GOOGLE_APPLICATION_CREDENTIALS=$GOOGLE_APPLICATION_CREDENTIALS \
  pubsubsender

Uploading to the Google Container Repository

gcloud auth configure-docker
TAG=v1
docker tag pubsubsender gcr.io/$PROJECT_ID/pubsubsender:$TAG
docker push gcr.io/$PROJECT_ID/pubsubsender:$TAG

Start up a GKE cluster

gcloud components install kubectl
CLUSTER_NAME=monitoring-$ZONE
gcloud container clusters create $CLUSTER_NAME --zone $ZONE --num-nodes 2
gcloud container clusters get-credentials $CLUSTER_NAME

Deploy the sender to the cluster:

kubectl run pubsubsender --image gcr.io/$PROJECT_ID/pubsubsender:$TAG \
  --env="PROJECT_ID=$PROJECT_ID" \
  --env "GOOGLE_APPLICATION_CREDENTIALS=$GOOGLE_APPLICATION_CREDENTIALS"

Building and deploying the receiver the container locally

docker build -f Dockerfile-receiver -t pubsubreceiver .
docker run -it --env PROJECT_ID=$PROJECT_ID \
  --env GOOGLE_APPLICATION_CREDENTIALS=$GOOGLE_APPLICATION_CREDENTIALS \
  pubsubreceiver

Deploy to K8s

docker tag pubsubreceiver gcr.io/$PROJECT_ID/pubsubreceiver:$TAG
docker push gcr.io/$PROJECT_ID/pubsubreceiver:$TAG
kubectl run pubsubreceiver --image gcr.io/$PROJECT_ID/pubsubreceiver:$TAG \
  --env="PROJECT_ID=$PROJECT_ID" \
  --env "GOOGLE_APPLICATION_CREDENTIALS=$GOOGLE_APPLICATION_CREDENTIALS"

Troubleshooting

The sender creates a PubSub topic called 'e2eprobe'. To verify that the sender successfully started up, you can check this with the gcloud command:

gcloud beta pubsub topics list 

To check on the receiver, you can list the subscriptions with the command

gcloud beta pubsub topics list-subscriptions e2eprobe

To check that the probes are running in Kubernetes, use the command

$ kubectl get pods
NAME                              READY     STATUS    RESTARTS   AGE
pubsubreceiver-66f7bf69c7-69d8b   1/1       Running   0          2m
pubsubsender-7d7c8fc757-qnfs9     1/1       Running   0          20m

If you look in the GCP Log Console, you should see log entries from the Python code in pubsubsender.py and pubsubreceiver.py.

License

Apache 2.0 Copyright 2018 Google. All rights reserved.

FROM google/cloud-sdk:latest
ADD pubsubreceiver.py /
ADD *.json /
RUN pip install python-dateutil
RUN pip install --upgrade google-cloud-pubsub
RUN pip install --upgrade google-cloud-monitoring
CMD [ "python", "./pubsubreceiver.py", "$PROJECT_ID", "$GOOGLE_APPLICATION_CREDENTIALS"]
FROM google/cloud-sdk:latest
ADD pubsubsender.py /
ADD *.json /
RUN pip install --upgrade google-cloud-pubsub
CMD [ "python", "./pubsubsender.py", "$PROJECT_ID", "$GOOGLE_APPLICATION_CREDENTIALS"]
FROM google/cloud-sdk:latest
ADD monitoring_setup.py /
ADD *.json /
RUN pip install --upgrade google-cloud-monitoring
CMD [ "python", "./monitoring_setup.py", "$PROJECT_ID", "$GOOGLE_APPLICATION_CREDENTIALS"]
import os
from google.cloud import monitoring_v3
project = os.getenv('PROJECT_ID')
client = monitoring_v3.MetricServiceClient()
name = client.project_path(project)
metric_descriptor =
{'name': 'projects/alexamies-shared/metricDescriptors/custom.googleapis.com%2Fpubsube2e',
'type': 'custom.googleapis.com/pubsube2e',
'description':'PubSub end-to-end delivery',
'metric_kind': 'GAUGE',
'value_type': 'DOUBLE'}
response = client.create_metric_descriptor(name, metric_descriptor)
# Program to read PubSub messages, measure end-to-end delivery time, and
# report to Stackdriver
from dateutil import parser
import os
import random
import time
from google.cloud import monitoring_v3
from google.cloud import pubsub_v1
def read_messages():
project = os.getenv('PROJECT_ID')
print('Reading messages - {}'.format(project))
client = monitoring_v3.MetricServiceClient()
project_id = os.getenv('PROJECT_ID')
project_name = client.project_path(project_id)
subscriber = pubsub_v1.SubscriberClient()
sub_name = "e2eprobe-{}".format(random.randint(1,9999))
subscription_path = subscriber.subscription_path(project, sub_name)
topic = 'projects/{project_id}/topics/{topic}'.format(project_id=project,
topic='e2eprobe')
subscriber.create_subscription(subscription_path, topic)
def callback(message):
print('Received message: {}'.format(message))
message.ack()
if not message.attributes:
print('message had no attributes')
return
time_sent= float(message.attributes['time_sent'])
now = time.time()
latency = now - time_sent
print('latency {}'.format(latency))
if latency >= 0.0:
write_metric(latency, client, project_name)
else:
print('Latency is negative: {0}, sent: {1}, now: {2}'.format(latency,
time_sent, now))
subscriber.subscribe(subscription_path, callback = callback)
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
def write_metric(latency, client, project_name):
print('Writing latency - {}'.format(latency))
series = monitoring_v3.types.TimeSeries()
series.metric.type = 'custom.googleapis.com/pubsube2e'
series.resource.type = 'global'
point = series.points.add()
point.value.double_value = latency
now = time.time()
point.interval.end_time.seconds = int(now)
point.interval.end_time.nanos = int(
(now - point.interval.end_time.seconds) * 10**9)
client.create_time_series(project_name, [series])
print('Wrote latency - {}'.format(latency))
def main():
print('Starting up')
read_messages()
if __name__ == '__main__':
main()
# Program to send PubSub messages and measure end-to-end delivery time
import os
import time
from google.api_core import exceptions
from google.cloud import pubsub_v1
def send_message(project, sequence_no):
print('Sending message - {0}'.format(project))
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, 'e2eprobe')
# The content will be ignored
data_str = "{}".format(sequence_no)
data = data_str.encode('utf-8')
time_sent = '{}'.format(time.time())
publisher.publish(topic_path, data = data, time_sent = time_sent)
print('Message sent: {0}'.format(time_sent))
def setup_topic(project):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, 'e2eprobe')
try:
topic = publisher.create_topic(topic_path)
print('Topic created: {}'.format(topic))
except exceptions.AlreadyExists:
print('Topic already exists - no problem')
def poll(project):
i = 0
while 0 < 1:
i += 1
send_message(project, i)
time.sleep(60)
def main():
print('Starting up')
project = os.getenv('PROJECT_ID')
setup_topic(project)
poll(project)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment