Created
May 30, 2018 04:54
-
-
Save JoelBender/0b31fa62cafc7b9d3a0c79801bd844b8 to your computer and use it in GitHub Desktop.
Echo Application
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
This application presents a 'console' prompt to the user asking for read commands | |
which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK | |
and prints the value. | |
""" | |
import sys | |
from bacpypes.debugging import bacpypes_debugging, ModuleLogger | |
from bacpypes.consolelogging import ConfigArgumentParser | |
from bacpypes.consolecmd import ConsoleCmd | |
from bacpypes.core import run, enable_sleeping | |
from bacpypes.iocb import IOCB | |
from bacpypes.apdu import ReadPropertyRequest, ReadPropertyACK | |
from bacpypes.primitivedata import Unsigned | |
from bacpypes.constructeddata import Array | |
from bacpypes.app import ApplicationIOController | |
from bacpypes.object import get_object_class, get_datatype | |
from bacpypes.local.device import LocalDeviceObject | |
from bacpypes.service.device import WhoIsIAmServices | |
from bacpypes.service.object import ReadWritePropertyServices | |
# some debugging | |
_debug = 0 | |
_log = ModuleLogger(globals()) | |
# globals | |
this_application = None | |
# | |
# EchoApplication | |
# | |
@bacpypes_debugging | |
class EchoApplication(ApplicationIOController, WhoIsIAmServices, ReadWritePropertyServices): | |
def __init__(self, *args): | |
if _debug: EchoApplication._debug("__init__ %r", args) | |
ApplicationIOController.__init__(self, *args) | |
def request(self, apdu): | |
if _debug: EchoApplication._debug("request %r", apdu) | |
self.indication(apdu) | |
def response(self, apdu): | |
if _debug: EchoApplication._debug("response %r", apdu) | |
self.confirmation(apdu) | |
# | |
# ReadPropertyConsoleCmd | |
# | |
@bacpypes_debugging | |
class ReadPropertyConsoleCmd(ConsoleCmd): | |
def do_read(self, args): | |
"""read <type> <inst> <prop> [ <indx> ]""" | |
args = args.split() | |
if _debug: ReadPropertyConsoleCmd._debug("do_read %r", args) | |
try: | |
obj_type, obj_inst, prop_id = args[:3] | |
if obj_type.isdigit(): | |
obj_type = int(obj_type) | |
elif not get_object_class(obj_type): | |
raise ValueError("unknown object type") | |
obj_inst = int(obj_inst) | |
datatype = get_datatype(obj_type, prop_id) | |
if not datatype: | |
raise ValueError("invalid property for object type") | |
# build a request | |
request = ReadPropertyRequest( | |
objectIdentifier=(obj_type, obj_inst), | |
propertyIdentifier=prop_id, | |
) | |
if len(args) == 5: | |
request.propertyArrayIndex = int(args[4]) | |
if _debug: ReadPropertyConsoleCmd._debug(" - request: %r", request) | |
# make an IOCB | |
iocb = IOCB(request) | |
if _debug: ReadPropertyConsoleCmd._debug(" - iocb: %r", iocb) | |
# give it to the application | |
this_application.request_io(iocb) | |
# wait for it to complete | |
iocb.wait() | |
# do something for error/reject/abort | |
if iocb.ioError: | |
sys.stdout.write(str(iocb.ioError) + '\n') | |
# do something for success | |
elif iocb.ioResponse: | |
apdu = iocb.ioResponse | |
# should be an ack | |
if not isinstance(apdu, ReadPropertyACK): | |
if _debug: ReadPropertyConsoleCmd._debug(" - not an ack") | |
return | |
# find the datatype | |
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) | |
if _debug: ReadPropertyConsoleCmd._debug(" - datatype: %r", datatype) | |
if not datatype: | |
raise TypeError("unknown datatype") | |
# special case for array parts, others are managed by cast_out | |
if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): | |
if apdu.propertyArrayIndex == 0: | |
value = apdu.propertyValue.cast_out(Unsigned) | |
else: | |
value = apdu.propertyValue.cast_out(datatype.subtype) | |
else: | |
value = apdu.propertyValue.cast_out(datatype) | |
if _debug: ReadPropertyConsoleCmd._debug(" - value: %r", value) | |
sys.stdout.write(str(value) + '\n') | |
if hasattr(value, 'debug_contents'): | |
value.debug_contents(file=sys.stdout) | |
sys.stdout.flush() | |
# do something with nothing? | |
else: | |
if _debug: ReadPropertyConsoleCmd._debug(" - ioError or ioResponse expected") | |
except Exception as error: | |
ReadPropertyConsoleCmd._exception("exception: %r", error) | |
# | |
# __main__ | |
# | |
def main(): | |
global this_application | |
# parse the command line arguments | |
args = ConfigArgumentParser(description=__doc__).parse_args() | |
if _debug: _log.debug("initialization") | |
if _debug: _log.debug(" - args: %r", args) | |
# make a device object | |
this_device = LocalDeviceObject(ini=args.ini) | |
if _debug: _log.debug(" - this_device: %r", this_device) | |
# make a simple application | |
this_application = EchoApplication(this_device) | |
# make a console | |
this_console = ReadPropertyConsoleCmd() | |
if _debug: _log.debug(" - this_console: %r", this_console) | |
# enable sleeping will help with threads | |
enable_sleeping() | |
_log.debug("running") | |
run() | |
_log.debug("fini") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment