Created
April 14, 2022 03:49
-
-
Save JoelBender/e8413dfb466789a7b97dd095e0c2d172 to your computer and use it in GitHub Desktop.
Client-only Application
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
This is a clone of the ReadProperty.py sample application that does not | |
support the Who-Is and Read-Property services (which are provided by default) | |
making this essentially a client-only device. | |
Note that it is not completely stealthy, it will respond with a Reject if | |
it is sent a Read-Property request. | |
""" | |
import sys | |
from bacpypes.debugging import bacpypes_debugging, ModuleLogger | |
from bacpypes.consolelogging import ConfigArgumentParser | |
from bacpypes.consolecmd import ConsoleCmd | |
from bacpypes.core import run, deferred, enable_sleeping | |
from bacpypes.comm import bind | |
from bacpypes.iocb import IOCB | |
from bacpypes.pdu import Address | |
from bacpypes.apdu import ReadPropertyRequest, ReadPropertyACK | |
from bacpypes.primitivedata import Unsigned, ObjectIdentifier | |
from bacpypes.constructeddata import Array | |
from bacpypes.app import ApplicationIOController | |
from bacpypes.appservice import ApplicationServiceAccessPoint, StateMachineAccessPoint | |
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement | |
from bacpypes.bvllservice import BIPSimple, AnnexJCodec, UDPMultiplexer | |
from bacpypes.object import get_datatype | |
from bacpypes.local.device import LocalDeviceObject | |
# some debugging | |
_debug = 0 | |
_log = ModuleLogger(globals()) | |
# globals | |
this_application = None | |
@bacpypes_debugging | |
class ReadPropertyConsoleCmd(ConsoleCmd): | |
def do_read(self, args): | |
"""read <addr> <objid> <prop> [ <indx> ]""" | |
args = args.split() | |
if _debug: | |
ReadPropertyConsoleCmd._debug("do_read %r", args) | |
try: | |
addr, obj_id, prop_id = args[:3] | |
obj_id = ObjectIdentifier(obj_id).value | |
if prop_id.isdigit(): | |
prop_id = int(prop_id) | |
datatype = get_datatype(obj_id[0], prop_id) | |
if not datatype: | |
raise ValueError("invalid property for object type") | |
# build a request | |
request = ReadPropertyRequest( | |
objectIdentifier=obj_id, propertyIdentifier=prop_id | |
) | |
request.pduDestination = Address(addr) | |
if len(args) == 4: | |
request.propertyArrayIndex = int(args[3]) | |
if _debug: | |
ReadPropertyConsoleCmd._debug(" - request: %r", request) | |
# make an IOCB | |
iocb = IOCB(request) | |
if _debug: | |
ReadPropertyConsoleCmd._debug(" - iocb: %r", iocb) | |
# give it to the application | |
deferred(this_application.request_io, iocb) | |
# wait for it to complete | |
iocb.wait() | |
# do something for error/reject/abort | |
if iocb.ioError: | |
sys.stdout.write(str(iocb.ioError) + "\n") | |
# do something for success | |
elif iocb.ioResponse: | |
apdu = iocb.ioResponse | |
# should be an ack | |
if not isinstance(apdu, ReadPropertyACK): | |
if _debug: | |
ReadPropertyConsoleCmd._debug(" - not an ack") | |
return | |
# find the datatype | |
datatype = get_datatype( | |
apdu.objectIdentifier[0], apdu.propertyIdentifier | |
) | |
if _debug: | |
ReadPropertyConsoleCmd._debug(" - datatype: %r", datatype) | |
if not datatype: | |
raise TypeError("unknown datatype") | |
# special case for array parts, others are managed by cast_out | |
if issubclass(datatype, Array) and ( | |
apdu.propertyArrayIndex is not None | |
): | |
if apdu.propertyArrayIndex == 0: | |
value = apdu.propertyValue.cast_out(Unsigned) | |
else: | |
value = apdu.propertyValue.cast_out(datatype.subtype) | |
else: | |
value = apdu.propertyValue.cast_out(datatype) | |
if _debug: | |
ReadPropertyConsoleCmd._debug(" - value: %r", value) | |
sys.stdout.write(str(value) + "\n") | |
if hasattr(value, "debug_contents"): | |
value.debug_contents(file=sys.stdout) | |
elif isinstance(value, list) and len(value) > 0: | |
for i, element in enumerate(value): | |
sys.stdout.write(" [{}] {}\n".format(i, element)) | |
if hasattr(element, "debug_contents"): | |
element.debug_contents(file=sys.stdout, indent=2) | |
sys.stdout.flush() | |
# do something with nothing? | |
else: | |
if _debug: | |
ReadPropertyConsoleCmd._debug( | |
" - ioError or ioResponse expected" | |
) | |
except Exception as error: | |
ReadPropertyConsoleCmd._exception("exception: %r", error) | |
def do_rtn(self, args): | |
"""rtn <addr> <net> ... """ | |
args = args.split() | |
if _debug: | |
ReadPropertyConsoleCmd._debug("do_rtn %r", args) | |
# provide the address and a list of network numbers | |
router_address = Address(args[0]) | |
network_list = [int(arg) for arg in args[1:]] | |
# pass along to the service access point | |
this_application.nsap.update_router_references( | |
None, router_address, network_list | |
) | |
# | |
# BIPClientApplication | |
# | |
@bacpypes_debugging | |
class BIPClientApplication(ApplicationIOController): | |
""" | |
This class is essentially identical to the BIPSimpleApplication except that | |
it does not inherit the Who-Is and Read-Property services. | |
""" | |
def __init__(self, localDevice, localAddress, deviceInfoCache=None, aseID=None): | |
if _debug: | |
BIPClientApplication._debug( | |
"__init__ %r %r deviceInfoCache=%r aseID=%r", | |
localDevice, | |
localAddress, | |
deviceInfoCache, | |
aseID, | |
) | |
ApplicationIOController.__init__( | |
self, localDevice, localAddress, deviceInfoCache, aseID=aseID | |
) | |
# local address might be useful for subclasses | |
if isinstance(localAddress, Address): | |
self.localAddress = localAddress | |
else: | |
self.localAddress = Address(localAddress) | |
# include a application decoder | |
self.asap = ApplicationServiceAccessPoint() | |
# pass the device object to the state machine access point so it | |
# can know if it should support segmentation | |
self.smap = StateMachineAccessPoint(localDevice) | |
# the segmentation state machines need access to the same device | |
# information cache as the application | |
self.smap.deviceInfoCache = self.deviceInfoCache | |
# a network service access point will be needed | |
self.nsap = NetworkServiceAccessPoint() | |
# give the NSAP a generic network layer service element | |
self.nse = NetworkServiceElement() | |
bind(self.nse, self.nsap) | |
# bind the top layers | |
bind(self, self.asap, self.smap, self.nsap) | |
# create a generic BIP stack, bound to the Annex J server | |
# on the UDP multiplexer | |
self.bip = BIPSimple() | |
self.annexj = AnnexJCodec() | |
self.mux = UDPMultiplexer(self.localAddress) | |
# bind the bottom layers | |
bind(self.bip, self.annexj, self.mux.annexJ) | |
# bind the BIP stack to the network, no network number | |
self.nsap.bind(self.bip, address=self.localAddress) | |
def close_socket(self): | |
if _debug: | |
BIPClientApplication._debug("close_socket") | |
# pass to the multiplexer, then down to the sockets | |
self.mux.close_socket() | |
def main(): | |
global this_application | |
# parse the command line arguments | |
args = ConfigArgumentParser(description=__doc__).parse_args() | |
if _debug: | |
_log.debug("initialization") | |
if _debug: | |
_log.debug(" - args: %r", args) | |
# make a device object | |
this_device = LocalDeviceObject(ini=args.ini) | |
if _debug: | |
_log.debug(" - this_device: %r", this_device) | |
# make a client application | |
this_application = BIPClientApplication(this_device, args.ini.address) | |
# make a console | |
this_console = ReadPropertyConsoleCmd() | |
if _debug: | |
_log.debug(" - this_console: %r", this_console) | |
# enable sleeping will help with threads | |
enable_sleeping() | |
_log.debug("running") | |
run() | |
_log.debug("fini") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment