Created
July 18, 2018 03:14
-
-
Save JoelBender/03e189d4bdb5dbd95f9190f61e36f97c to your computer and use it in GitHub Desktop.
This application generates a Who-Is request at startup and collects the I-Am responses into a list for more processing.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
This application generates a Who-Is request at startup and collects the I-Am | |
responses into a list for more processing. | |
""" | |
import sys | |
from bacpypes.debugging import bacpypes_debugging, ModuleLogger | |
from bacpypes.consolelogging import ConfigArgumentParser | |
from bacpypes.core import run, stop, deferred | |
from bacpypes.task import FunctionTask | |
from bacpypes.pdu import Address, GlobalBroadcast | |
from bacpypes.apdu import WhoIsRequest | |
from bacpypes.errors import ParameterOutOfRange | |
from bacpypes.app import BIPSimpleApplication | |
from bacpypes.local.device import LocalDeviceObject | |
# some debugging | |
_debug = 0 | |
_log = ModuleLogger(globals()) | |
# settings | |
WHOIS_TIMEOUT = 3.0 | |
# globals | |
args = None | |
this_device = None | |
this_application = None | |
# | |
# WhoIsMoreApplication | |
# | |
@bacpypes_debugging | |
class WhoIsMoreApplication(BIPSimpleApplication): | |
def __init__(self, *args): | |
if _debug: WhoIsMoreApplication._debug("__init__ %r", args) | |
BIPSimpleApplication.__init__(self, *args) | |
# keep track of request to line up responses | |
self._who_is_request = None | |
self._i_am_responses = [] | |
def who_is(self): | |
if _debug: WhoIsMoreApplication._debug("who_is") | |
global args | |
# check for an existing request | |
if self._who_is_request: | |
raise RuntimeError("one request at a time") | |
# build a request | |
who_is = WhoIsRequest( | |
deviceInstanceRangeLowLimit=args.lolimit, | |
deviceInstanceRangeHighLimit=args.hilimit, | |
destination=args.address, | |
) | |
if _debug: WhoIsMoreApplication._debug(" - who_is: %r", who_is) | |
# save the request to line up responses, clear out the responses | |
self._who_is_request = who_is | |
self._i_am_responses = [] | |
# set a timeout | |
timeout_task = FunctionTask(self.next_thing_to_do) | |
timeout_task.install_task(delta=args.timeout) | |
# call it soon | |
self.request(who_is) | |
def do_IAmRequest(self, apdu): | |
"""Process an I-Am request.""" | |
if _debug: WhoIsMoreApplication._debug("do_IAmRequest %r", apdu) | |
# no request issued yet, or too late | |
if not self._who_is_request: | |
return | |
# extract the device instance number | |
device_instance = apdu.iAmDeviceIdentifier[1] | |
if _debug: WhoIsMoreApplication._debug(" - device_instance: %r", device_instance) | |
# extract the source address | |
device_address = apdu.pduSource | |
if _debug: WhoIsMoreApplication._debug(" - device_address: %r", device_address) | |
# check to see if the application is waiting for this one | |
if self._who_is_request.deviceInstanceRangeLowLimit and (device_instance < self._who_is_request.deviceInstanceRangeLowLimit): | |
pass | |
elif self._who_is_request.deviceInstanceRangeHighLimit and (device_instance > self._who_is_request.deviceInstanceRangeHighLimit): | |
pass | |
else: | |
self._i_am_responses.append(apdu) | |
def next_thing_to_do(self): | |
if _debug: WhoIsMoreApplication._debug("next_thing_to_do") | |
# clear out the current request | |
self._who_is_request = None | |
# do something with the responses | |
for iam in self._i_am_responses: | |
print(repr(iam)) | |
iam.debug_contents(file=sys.stdout) | |
# done for now | |
stop() | |
# | |
# main | |
# | |
def main(): | |
global args, this_device, this_application | |
# parse the command line arguments | |
parser = ConfigArgumentParser(description=__doc__) | |
parser.add_argument( | |
"lolimit", type=int, | |
help="device instance range low limit", | |
) | |
parser.add_argument( | |
"hilimit", type=int, | |
help="device instance range high limit", | |
) | |
parser.add_argument( | |
"address", nargs='?', type=Address, | |
help="destination address", | |
default=GlobalBroadcast(), | |
) | |
parser.add_argument( | |
"--timeout", type=float, | |
help="who-is timeout", | |
default=WHOIS_TIMEOUT, | |
) | |
args = parser.parse_args() | |
if _debug: _log.debug("initialization") | |
if _debug: _log.debug(" - args: %r", args) | |
# litte error checking | |
if (args.lolimit < 0) or (args.lolimit > 4194303): | |
raise ParameterOutOfRange("low limit out of range") | |
if (args.hilimit < 0) or (args.hilimit > 4194303): | |
raise ParameterOutOfRange("high limit out of range") | |
# make a device object | |
this_device = LocalDeviceObject(ini=args.ini) | |
if _debug: _log.debug(" - this_device: %r", this_device) | |
# make a simple application | |
this_application = WhoIsMoreApplication( | |
this_device, args.ini.address, | |
) | |
if _debug: _log.debug(" - this_application: %r", this_application) | |
# call this when things are up and running | |
deferred(this_application.who_is) | |
if _debug: _log.debug("running") | |
run() | |
if _debug: _log.debug("fini") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment