Skip to content

Instantly share code, notes, and snippets.

@ccall48
Created June 7, 2023 01:43
Show Gist options
  • Save ccall48/bb65abdbdabd249dc72414f25e49eb50 to your computer and use it in GitHub Desktop.
Save ccall48/bb65abdbdabd249dc72414f25e49eb50 to your computer and use it in GitHub Desktop.
chirpstack grpc python example to get device & gateway statuses.
from itertools import chain
from datetime import datetime as dt, timedelta as td
import json
import grpc
from google.protobuf.json_format import MessageToJson
from chirpstack_api import api
api_token = '<CHIRPSTACK API_KEY>'
server = 'localhost:8080'
def online(timeObj, min):
now = dt.now() - td(minutes=min)
check_time = dt.strptime(timeObj, '%Y-%m-%dT%H:%M:%S.%fZ') + td(hours=10)
return check_time > now
def get_gw_list():
"""returns gateway list and status for all gateways on LNS"""
with grpc.insecure_channel(server) as channel:
client = api.GatewayServiceStub(channel)
# # Define the API key meta-data.
auth_token = [('authorization', f'Bearer {api_token}')]
# # Construct request.
req = api.ListGatewaysRequest()
req.limit = 1000 # mandatory if you want details.
resp = client.List(req, metadata=auth_token)
return MessageToJson(resp)
def get_gw_status():
gws = json.loads(get_gw_list())
gw_state = {gw['name']: gw['state'] for gw in gws['result']}
return gw_state
def get_tenants_list():
with grpc.insecure_channel(server) as channel:
client = api.TenantServiceStub(channel)
# # Define the API key meta-data.
auth_token = [('authorization', f'Bearer {api_token}')]
# # Construct request.
req = api.ListTenantsRequest()
req.limit = 1000 # mandatory if you want details.
resp = client.List(req, metadata=auth_token)
return json.loads(MessageToJson(resp))
def get_apps_list(tenant_id):
"""all applications for given tenant id"""
with grpc.insecure_channel(server) as channel:
client = api.ApplicationServiceStub(channel)
# # Define the API key meta-data.
auth_token = [('authorization', f'Bearer {api_token}')]
# # Construct request.
req = api.ListApplicationsRequest()
req.limit = 1000 # mandatory if you want details.
req.tenant_id = tenant_id
resp = client.List(req, metadata=auth_token)
data = json.loads(MessageToJson(resp))
return [app['id'] for app in data['result']]
def get_devices_list(application_id: str, heartbeat: int):
with grpc.insecure_channel(server) as channel:
client = api.DeviceServiceStub(channel)
# # Define the API key meta-data.
auth_token = [('authorization', f'Bearer {api_token}')]
# # Construct request.
req = api.ListDevicesRequest()
req.limit = 1000 # mandatory if you want details.
req.application_id = application_id
resp = client.List(req, metadata=auth_token)
devices = json.loads(MessageToJson(resp))
if 'result' in devices.keys():
return {
device['name']: online(device['lastSeenAt'], heartbeat) for device in devices['result']
}
return {}
def main(heartbeat: int):
# # collect list of all tenant id's.
all_tenants = [tenant['id'] for tenant in get_tenants_list()['result']]
# # flatten list of all tenant apps from a list of lists for easier lookup.
all_apps = list(chain(*[get_apps_list(tenant) for tenant in all_tenants]))
# # all devices status going off if seen in the last 30 minutes.
all_devices = [get_devices_list(device, heartbeat) for device in all_apps]
return all_devices
if __name__ == '__main__':
minutes = 60
apps = main(minutes)
print('==========[APPLICATION STATUSES]==========')
for app in apps:
for device, is_active in app.items():
print(is_active, device)
print('\n==========[GATEWAY STATUSES]==========')
gws = get_gw_status()
for gw, status in gws.items():
print(status, gw)
'''
# example to create python virtual environment.
python3 -m venv somefolder && cd somefolder
source bin/activate
# pip install minimal requirements to run script below.
pip install grpcio grpcio-tools chirpstack-api
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment