Skip to content

Instantly share code, notes, and snippets.

@austinfromboston
Last active November 21, 2020 17:13
Show Gist options
  • Save austinfromboston/6f2f6a8dd0a19653c35b109b63813075 to your computer and use it in GitHub Desktop.
Save austinfromboston/6f2f6a8dd0a19653c35b109b63813075 to your computer and use it in GitHub Desktop.
validator check client
import base64
import requests
import os
from twilio.rest import Client
CHAIN_API="https://medalla.beaconcha.in"
COUNTRY_CODE="+1"
FAILURE_THRESHOLD=100
def check_validator(event, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
print(pubsub_message)
# Find out when the validator last reported
validator_key = os.getenv("VALIDATOR_KEY")
check = requests.get("{}/api/v1/validator/{}".format(CHAIN_API, validator_key))
if check.status_code != 200:
exit()
check_slot = check.json()["data"]["lastattestationslot"]
print("my last slot:", check_slot)
# Find the current slot for the network
network = requests.get("{}/api/v1/block/latest".format(CHAIN_API))
if network.status_code != 200:
exit()
network_slot = network.json()["data"]["slot"]
print("network slot:", network_slot)
# Prepare Twilio client
account_sid = os.environ['TWILIO_ACCOUNT_SID']
auth_token = os.environ['TWILIO_AUTH_TOKEN']
target_phone=os.getenv('TARGET_PHONE')
source_phone=os.getenv("SOURCE_PHONE")
client = Client(account_sid, auth_token)
# If the slots are too far apart, send an alert
slot_drift = network_slot - check_slot
if slot_drift > FAILURE_THRESHOLD:
client.api.account.messages.create(
to="{}{}".format(COUNTRY_CODE, target_phone),
from_="{}{}".format(COUNTRY_CODE, source_phone),
body="validator {} was last seen at {}, is now missing {} slots".format(os.getenv("VALIDATOR_NICKNAME"), check_slot, slot_drift))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment