Created
September 24, 2024 05:32
-
-
Save JoelBender/d210578c474f3fe8417d0d2f161e4d4b to your computer and use it in GitHub Desktop.
Custom Network Service Access Point
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This example "intercepts" the NPDU coming up from a bottom layer and stores | |
the source address into the pduUserData attribute which is passed up through | |
the layers and reaches the application. | |
""" | |
import asyncio | |
from typing import List, Optional | |
from bacpypes3.debugging import ModuleLogger, bacpypes_debugging | |
from bacpypes3.argparse import SimpleArgumentParser | |
from bacpypes3.comm import bind | |
from bacpypes3.netservice import ( | |
NPDU, | |
NetworkAdapter, | |
RouterInfoCache, | |
NetworkServiceAccessPoint, | |
NetworkServiceElement, | |
) | |
from bacpypes3.app import DeviceInfoCache, Application | |
from bacpypes3.appservice import ApplicationServiceAccessPoint | |
from bacpypes3.object import DeviceObject, Object | |
# some debugging | |
_debug = 0 | |
_log = ModuleLogger(globals()) | |
@bacpypes_debugging | |
class CustomNSAP(NetworkServiceAccessPoint): | |
async def process_npdu(self, adapter: NetworkAdapter, npdu: NPDU) -> None: | |
if _debug: | |
CustomNSAP._debug("process_npdu %r %r", adapter, npdu) | |
# store the original source address in the user data attribute | |
npdu.pduUserData = npdu.pduSource | |
# continue as usual | |
await super().process_npdu(adapter, npdu) | |
@bacpypes_debugging | |
class CustomApplication(Application): | |
@classmethod | |
def from_object_list( | |
cls, | |
objects: List[Object], | |
device_info_cache: Optional[DeviceInfoCache] = None, | |
router_info_cache: Optional[RouterInfoCache] = None, | |
aseID=None, | |
) -> Application: | |
""" | |
Create an instance of an Application given a list of objects. | |
""" | |
if _debug: | |
CustomApplication._debug( | |
"from_object_list %s device_info_cache=%r aseID=%r", | |
repr(objects), | |
device_info_cache, | |
aseID, | |
) | |
# find the device object | |
device_object = None | |
for obj in objects: | |
if not isinstance(obj, DeviceObject): | |
continue | |
if device_object is not None: | |
raise RuntimeError("duplicate device object") | |
device_object = obj | |
if device_object is None: | |
raise RuntimeError("missing device object") | |
# create a base instance | |
app = cls(device_info_cache=device_info_cache, aseID=aseID) | |
# a application service access point will be needed | |
app.asap = ApplicationServiceAccessPoint(device_object, app.device_info_cache) | |
# use our custom network service access point | |
app.nsap = CustomNSAP(router_info_cache=router_info_cache) | |
# give the NSAP a generic network layer service element | |
app.nse = NetworkServiceElement() | |
bind(app.nse, app.nsap) | |
# bind the top layers | |
bind(app, app.asap, app.nsap) | |
# add the objects | |
for obj in objects: | |
app.add_object(obj) | |
# return the built application | |
return app | |
async def main() -> None: | |
app = None | |
try: | |
args = SimpleArgumentParser().parse_args() | |
if _debug: | |
_log.debug("args: %r", args) | |
# build an application | |
app = CustomApplication.from_args(args) | |
if _debug: | |
_log.debug("app: %r", app) | |
# like running forever | |
await asyncio.Future() | |
finally: | |
if app: | |
app.close() | |
if __name__ == "__main__": | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
if _debug: | |
_log.debug("keyboard interrupt") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment