Created
June 7, 2023 01:43
-
-
Save ccall48/bb65abdbdabd249dc72414f25e49eb50 to your computer and use it in GitHub Desktop.
chirpstack grpc python example to get device & gateway statuses.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from itertools import chain | |
from datetime import datetime as dt, timedelta as td | |
import json | |
import grpc | |
from google.protobuf.json_format import MessageToJson | |
from chirpstack_api import api | |
api_token = '<CHIRPSTACK API_KEY>' | |
server = 'localhost:8080' | |
def online(timeObj, min): | |
now = dt.now() - td(minutes=min) | |
check_time = dt.strptime(timeObj, '%Y-%m-%dT%H:%M:%S.%fZ') + td(hours=10) | |
return check_time > now | |
def get_gw_list(): | |
"""returns gateway list and status for all gateways on LNS""" | |
with grpc.insecure_channel(server) as channel: | |
client = api.GatewayServiceStub(channel) | |
# # Define the API key meta-data. | |
auth_token = [('authorization', f'Bearer {api_token}')] | |
# # Construct request. | |
req = api.ListGatewaysRequest() | |
req.limit = 1000 # mandatory if you want details. | |
resp = client.List(req, metadata=auth_token) | |
return MessageToJson(resp) | |
def get_gw_status(): | |
gws = json.loads(get_gw_list()) | |
gw_state = {gw['name']: gw['state'] for gw in gws['result']} | |
return gw_state | |
def get_tenants_list(): | |
with grpc.insecure_channel(server) as channel: | |
client = api.TenantServiceStub(channel) | |
# # Define the API key meta-data. | |
auth_token = [('authorization', f'Bearer {api_token}')] | |
# # Construct request. | |
req = api.ListTenantsRequest() | |
req.limit = 1000 # mandatory if you want details. | |
resp = client.List(req, metadata=auth_token) | |
return json.loads(MessageToJson(resp)) | |
def get_apps_list(tenant_id): | |
"""all applications for given tenant id""" | |
with grpc.insecure_channel(server) as channel: | |
client = api.ApplicationServiceStub(channel) | |
# # Define the API key meta-data. | |
auth_token = [('authorization', f'Bearer {api_token}')] | |
# # Construct request. | |
req = api.ListApplicationsRequest() | |
req.limit = 1000 # mandatory if you want details. | |
req.tenant_id = tenant_id | |
resp = client.List(req, metadata=auth_token) | |
data = json.loads(MessageToJson(resp)) | |
return [app['id'] for app in data['result']] | |
def get_devices_list(application_id: str, heartbeat: int): | |
with grpc.insecure_channel(server) as channel: | |
client = api.DeviceServiceStub(channel) | |
# # Define the API key meta-data. | |
auth_token = [('authorization', f'Bearer {api_token}')] | |
# # Construct request. | |
req = api.ListDevicesRequest() | |
req.limit = 1000 # mandatory if you want details. | |
req.application_id = application_id | |
resp = client.List(req, metadata=auth_token) | |
devices = json.loads(MessageToJson(resp)) | |
if 'result' in devices.keys(): | |
return { | |
device['name']: online(device['lastSeenAt'], heartbeat) for device in devices['result'] | |
} | |
return {} | |
def main(heartbeat: int): | |
# # collect list of all tenant id's. | |
all_tenants = [tenant['id'] for tenant in get_tenants_list()['result']] | |
# # flatten list of all tenant apps from a list of lists for easier lookup. | |
all_apps = list(chain(*[get_apps_list(tenant) for tenant in all_tenants])) | |
# # all devices status going off if seen in the last 30 minutes. | |
all_devices = [get_devices_list(device, heartbeat) for device in all_apps] | |
return all_devices | |
if __name__ == '__main__': | |
minutes = 60 | |
apps = main(minutes) | |
print('==========[APPLICATION STATUSES]==========') | |
for app in apps: | |
for device, is_active in app.items(): | |
print(is_active, device) | |
print('\n==========[GATEWAY STATUSES]==========') | |
gws = get_gw_status() | |
for gw, status in gws.items(): | |
print(status, gw) | |
''' | |
# example to create python virtual environment. | |
python3 -m venv somefolder && cd somefolder | |
source bin/activate | |
# pip install minimal requirements to run script below. | |
pip install grpcio grpcio-tools chirpstack-api | |
''' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment