Skip to content

Instantly share code, notes, and snippets.

@JoelBender
Created September 24, 2024 05:32
Show Gist options
  • Save JoelBender/d210578c474f3fe8417d0d2f161e4d4b to your computer and use it in GitHub Desktop.
Save JoelBender/d210578c474f3fe8417d0d2f161e4d4b to your computer and use it in GitHub Desktop.
Custom Network Service Access Point
"""
This example "intercepts" the NPDU coming up from a bottom layer and stores
the source address into the pduUserData attribute which is passed up through
the layers and reaches the application.
"""
import asyncio
from typing import List, Optional
from bacpypes3.debugging import ModuleLogger, bacpypes_debugging
from bacpypes3.argparse import SimpleArgumentParser
from bacpypes3.comm import bind
from bacpypes3.netservice import (
NPDU,
NetworkAdapter,
RouterInfoCache,
NetworkServiceAccessPoint,
NetworkServiceElement,
)
from bacpypes3.app import DeviceInfoCache, Application
from bacpypes3.appservice import ApplicationServiceAccessPoint
from bacpypes3.object import DeviceObject, Object
# some debugging
_debug = 0
_log = ModuleLogger(globals())
@bacpypes_debugging
class CustomNSAP(NetworkServiceAccessPoint):
async def process_npdu(self, adapter: NetworkAdapter, npdu: NPDU) -> None:
if _debug:
CustomNSAP._debug("process_npdu %r %r", adapter, npdu)
# store the original source address in the user data attribute
npdu.pduUserData = npdu.pduSource
# continue as usual
await super().process_npdu(adapter, npdu)
@bacpypes_debugging
class CustomApplication(Application):
@classmethod
def from_object_list(
cls,
objects: List[Object],
device_info_cache: Optional[DeviceInfoCache] = None,
router_info_cache: Optional[RouterInfoCache] = None,
aseID=None,
) -> Application:
"""
Create an instance of an Application given a list of objects.
"""
if _debug:
CustomApplication._debug(
"from_object_list %s device_info_cache=%r aseID=%r",
repr(objects),
device_info_cache,
aseID,
)
# find the device object
device_object = None
for obj in objects:
if not isinstance(obj, DeviceObject):
continue
if device_object is not None:
raise RuntimeError("duplicate device object")
device_object = obj
if device_object is None:
raise RuntimeError("missing device object")
# create a base instance
app = cls(device_info_cache=device_info_cache, aseID=aseID)
# a application service access point will be needed
app.asap = ApplicationServiceAccessPoint(device_object, app.device_info_cache)
# use our custom network service access point
app.nsap = CustomNSAP(router_info_cache=router_info_cache)
# give the NSAP a generic network layer service element
app.nse = NetworkServiceElement()
bind(app.nse, app.nsap)
# bind the top layers
bind(app, app.asap, app.nsap)
# add the objects
for obj in objects:
app.add_object(obj)
# return the built application
return app
async def main() -> None:
app = None
try:
args = SimpleArgumentParser().parse_args()
if _debug:
_log.debug("args: %r", args)
# build an application
app = CustomApplication.from_args(args)
if _debug:
_log.debug("app: %r", app)
# like running forever
await asyncio.Future()
finally:
if app:
app.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
if _debug:
_log.debug("keyboard interrupt")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment