Created
September 10, 2019 13:43
-
-
Save JoelBender/c945660e70b0ec2af02786928cf31a20 to your computer and use it in GitHub Desktop.
Who-Is and Device Information
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
This application generates a Who-Is request at startup and collects the I-Am | |
responses into a list, then reads various properties of the device objects, | |
then outputs the content as a JSON document. | |
""" | |
import sys | |
import json | |
from bacpypes.debugging import bacpypes_debugging, ModuleLogger | |
from bacpypes.consolelogging import ConfigArgumentParser | |
from bacpypes.core import run, stop, deferred | |
from bacpypes.task import FunctionTask | |
from bacpypes.iocb import IOCB | |
from bacpypes.pdu import Address, GlobalBroadcast | |
from bacpypes.apdu import WhoIsRequest, ReadPropertyRequest | |
from bacpypes.primitivedata import Unsigned | |
from bacpypes.constructeddata import Array | |
from bacpypes.errors import ParameterOutOfRange | |
from bacpypes.object import get_datatype | |
from bacpypes.app import BIPSimpleApplication | |
from bacpypes.local.device import LocalDeviceObject | |
# some debugging | |
_debug = 0 | |
_log = ModuleLogger(globals()) | |
# settings | |
WHOIS_TIMEOUT = 3.0 | |
# globals | |
args = None | |
this_device = None | |
this_application = None | |
device_info = {} | |
@bacpypes_debugging | |
class SampleApplication(BIPSimpleApplication): | |
def __init__(self, *args): | |
if _debug: | |
SampleApplication._debug("__init__ %r", args) | |
BIPSimpleApplication.__init__(self, *args) | |
# keep track of request to line up responses | |
self._who_is_request = None | |
# other things to read | |
self.point_queue = [] | |
def who_is(self): | |
if _debug: | |
SampleApplication._debug("who_is") | |
global args | |
# check for an existing request | |
if self._who_is_request: | |
raise RuntimeError("one request at a time") | |
# build a request | |
who_is = WhoIsRequest( | |
deviceInstanceRangeLowLimit=args.lolimit, | |
deviceInstanceRangeHighLimit=args.hilimit, | |
destination=args.address, | |
) | |
if _debug: | |
SampleApplication._debug(" - who_is: %r", who_is) | |
# save the request to line up responses | |
self._who_is_request = who_is | |
# set a timeout | |
timeout_task = FunctionTask(self.next_request) | |
timeout_task.install_task(delta=args.timeout) | |
# call it soon | |
self.request(who_is) | |
def do_IAmRequest(self, apdu): | |
"""Process an I-Am request.""" | |
if _debug: | |
SampleApplication._debug("do_IAmRequest %r", apdu) | |
global device_info | |
# no request issued yet, or too late | |
if not self._who_is_request: | |
return | |
# extract the device instance number | |
device_instance = apdu.iAmDeviceIdentifier[1] | |
if _debug: | |
SampleApplication._debug(" - device_instance: %r", device_instance) | |
# extract the source address | |
device_address = apdu.pduSource | |
if _debug: | |
SampleApplication._debug(" - device_address: %r", device_address) | |
# check to see if the application is waiting for this one | |
if self._who_is_request.deviceInstanceRangeLowLimit and ( | |
device_instance < self._who_is_request.deviceInstanceRangeLowLimit | |
): | |
pass | |
elif self._who_is_request.deviceInstanceRangeHighLimit and ( | |
device_instance > self._who_is_request.deviceInstanceRangeHighLimit | |
): | |
pass | |
else: | |
# save the device information | |
device_info[device_instance] = { | |
"_device_instance": device_instance, | |
"maxAPDULengthAccepted": apdu.maxAPDULengthAccepted, | |
"segmentationSupported": apdu.segmentationSupported, | |
"vendorID": apdu.vendorID, | |
} | |
# queue up some reads | |
self.point_queue.append( | |
(device_address, apdu.iAmDeviceIdentifier, "objectName") | |
) | |
self.point_queue.append( | |
(device_address, apdu.iAmDeviceIdentifier, "vendorName") | |
) | |
self.point_queue.append( | |
(device_address, apdu.iAmDeviceIdentifier, "modelName") | |
) | |
self.point_queue.append( | |
(device_address, apdu.iAmDeviceIdentifier, "firmwareRevision") | |
) | |
def next_request(self): | |
if _debug: | |
SampleApplication._debug("next_request") | |
# check to see if we're done | |
if not self.point_queue: | |
if _debug: | |
SampleApplication._debug(" - done") | |
stop() | |
return | |
# get the next request | |
addr, obj_id, prop_id = self.point_queue.pop(0) | |
# build a request | |
request = ReadPropertyRequest( | |
destination=addr, objectIdentifier=obj_id, propertyIdentifier=prop_id | |
) | |
if _debug: | |
SampleApplication._debug(" - request: %r", request) | |
# make an IOCB | |
iocb = IOCB(request) | |
# set a callback for the response | |
iocb.add_callback(self.complete_request) | |
if _debug: | |
SampleApplication._debug(" - iocb: %r", iocb) | |
# send the request | |
this_application.request_io(iocb) | |
def complete_request(self, iocb): | |
if _debug: | |
SampleApplication._debug("complete_request %r", iocb) | |
if iocb.ioResponse: | |
apdu = iocb.ioResponse | |
# extract the device instance number | |
device_instance = apdu.objectIdentifier[1] | |
if _debug: | |
SampleApplication._debug(" - device_instance: %r", device_instance) | |
# find the datatype | |
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) | |
if _debug: | |
SampleApplication._debug(" - datatype: %r", datatype) | |
if not datatype: | |
raise TypeError("unknown datatype") | |
# special case for array parts, others are managed by cast_out | |
if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): | |
if apdu.propertyArrayIndex == 0: | |
value = apdu.propertyValue.cast_out(Unsigned) | |
else: | |
value = apdu.propertyValue.cast_out(datatype.subtype) | |
else: | |
value = apdu.propertyValue.cast_out(datatype) | |
if _debug: | |
SampleApplication._debug(" - value: %r", value) | |
# save the value | |
device_info[device_instance][apdu.propertyIdentifier] = value | |
if iocb.ioError: | |
if _debug: | |
SampleApplication._debug(" - error: %r", iocb.ioError) | |
sys.stderr.write( | |
"error reading {} from {}: {}\n".format( | |
iocb.args[0].propertyIdentifier, | |
iocb.args[0].objectIdentifier, | |
iocb.ioError, | |
) | |
) | |
# fire off another request | |
deferred(self.next_request) | |
def main(): | |
global args, this_device, this_application | |
# parse the command line arguments | |
parser = ConfigArgumentParser(description=__doc__) | |
parser.add_argument("lolimit", type=int, help="device instance range low limit") | |
parser.add_argument("hilimit", type=int, help="device instance range high limit") | |
parser.add_argument( | |
"address", | |
nargs="?", | |
type=Address, | |
help="destination address", | |
default=GlobalBroadcast(), | |
) | |
parser.add_argument( | |
"--timeout", type=float, help="who-is timeout", default=WHOIS_TIMEOUT | |
) | |
args = parser.parse_args() | |
if _debug: | |
_log.debug("initialization") | |
if _debug: | |
_log.debug(" - args: %r", args) | |
# litte error checking | |
if (args.lolimit < 0) or (args.lolimit > 4194303): | |
raise ParameterOutOfRange("low limit out of range") | |
if (args.hilimit < 0) or (args.hilimit > 4194303): | |
raise ParameterOutOfRange("high limit out of range") | |
# make a device object | |
this_device = LocalDeviceObject(ini=args.ini) | |
if _debug: | |
_log.debug(" - this_device: %r", this_device) | |
# make a simple application | |
this_application = SampleApplication(this_device, args.ini.address) | |
if _debug: | |
_log.debug(" - this_application: %r", this_application) | |
# call this when things are up and running | |
deferred(this_application.who_is) | |
if _debug: | |
_log.debug("running") | |
run() | |
if _debug: | |
_log.debug("fini") | |
json.dump({str(k): v for k, v in device_info.items()}, sys.stdout) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment