Created
August 24, 2021 04:14
-
-
Save JoelBender/adbd9da33db8d5c1d50d34db83457aa5 to your computer and use it in GitHub Desktop.
JSON Writable Configuration File
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"avar": 74.1, | |
"bvar": "inactive" | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This sample application is a BACnet device that has an AnalogValueObject | |
and BinaryValueObject in a JSON configuration file that can be changed by | |
a BACnet client. | |
""" | |
import os | |
import json | |
from pathlib import Path | |
from bacpypes.debugging import bacpypes_debugging, ModuleLogger | |
from bacpypes.consolelogging import ConfigArgumentParser | |
from bacpypes.core import run | |
from bacpypes.errors import ExecutionError | |
from bacpypes.primitivedata import Real | |
from bacpypes.basetypes import BinaryPV | |
from bacpypes.object import ( | |
Property, | |
AnalogValueObject, | |
BinaryValueObject, | |
register_object_type, | |
) | |
from bacpypes.app import BIPSimpleApplication | |
from bacpypes.local.device import LocalDeviceObject | |
from bacpypes.local.file import LocalStreamAccessFileObject | |
from bacpypes.service.file import FileServices | |
# some debugging | |
_debug = 0 | |
_log = ModuleLogger(globals()) | |
# configuration | |
JSON_CONFIG_FILE = os.getenv("JSON_CONFIG_FILE", Path(__file__).stem + ".json") | |
# globals | |
json_config = {} | |
@bacpypes_debugging | |
class JSONStreamFile(LocalStreamAccessFileObject): | |
def __init__(self, **kwargs): | |
""" Initialize a stream accessed file object. """ | |
if _debug: | |
JSONStreamFile._debug( | |
"__init__ %r", | |
kwargs, | |
) | |
LocalStreamAccessFileObject.__init__(self, **kwargs) | |
global json_config | |
# load in the file | |
with open(JSON_CONFIG_FILE, "rb") as config_file: | |
self._file_data = config_file.read() | |
if _debug: | |
JSONStreamFile._debug( | |
" - %d octets", | |
len(self._file_data), | |
) | |
# interpret the JSON blob | |
json_config = json.loads(self._file_data) | |
if _debug: | |
JSONStreamFile._debug(" - json_config: %r", json_config) | |
def __len__(self): | |
""" Return the number of octets in the file. """ | |
if _debug: | |
JSONStreamFile._debug("__len__") | |
return len(self._file_data) | |
def read_stream(self, start_position, octet_count): | |
""" Read a chunk of data out of the file. """ | |
if _debug: | |
JSONStreamFile._debug( | |
"read_stream %r %r", | |
start_position, | |
octet_count, | |
) | |
# end of file is true if last record is returned | |
end_of_file = (start_position + octet_count) >= len(self._file_data) | |
return ( | |
end_of_file, | |
self._file_data[start_position : start_position + octet_count], | |
) | |
def write_stream(self, start_position, data): | |
""" Write a number of octets, starting at a specific offset. """ | |
if _debug: | |
JSONStreamFile._debug( | |
"write_stream %r %r", | |
start_position, | |
data, | |
) | |
global json_config | |
# work on a copy of the file data | |
file_data = self._file_data | |
# check for append | |
if start_position < 0: | |
start_position = len(file_data) | |
file_data += data | |
# check to extend the file out to start_record records | |
elif start_position > len(file_data): | |
file_data += "\0" * (start_position - len(file_data)) | |
start_position = len(file_data) | |
file_data += data | |
# no slice assignment, strings are immutable | |
else: | |
data_len = len(data) | |
prechunk = file_data[:start_position] | |
postchunk = file_data[start_position + data_len :] | |
file_data = prechunk + data + postchunk | |
if _debug: | |
JSONStreamFile._debug(" - file_data: %r", file_data) | |
# interpret the JSON blob | |
try: | |
json_config = json.loads(file_data) | |
if _debug: | |
JSONStreamFile._debug(" - json_config: %r", json_config) | |
except json.decoder.JSONDecodeError as err: | |
if _debug: | |
JSONStreamFile._debug(" - err: %r", err) | |
raise RuntimeError("invalid JSON format") | |
# good to go | |
self._file_data = file_data | |
# save the updated contents | |
with open(JSON_CONFIG_FILE, "wb") as config_file: | |
config_file.write(self._file_data) | |
# return where the 'writing' actually started | |
return start_position | |
@bacpypes_debugging | |
class JSONValueProperty(Property): | |
def __init__(self, datatype): | |
if _debug: | |
JSONValueProperty._debug("__init__ %r", datatype) | |
Property.__init__( | |
self, "presentValue", datatype, default=0.0, optional=True, mutable=False | |
) | |
def ReadProperty(self, obj, arrayIndex=None): | |
if _debug: | |
JSONValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex) | |
global json_config | |
# access an array | |
if arrayIndex is not None: | |
raise ExecutionError( | |
errorClass="property", errorCode="propertyIsNotAnArray" | |
) | |
# return the contents of the configuration for the object | |
value = json_config[obj.objectName] | |
if _debug: | |
JSONValueProperty._debug(" - value: %r", value) | |
return value | |
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False): | |
if _debug: | |
JSONValueProperty._debug( | |
"WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", | |
obj, | |
value, | |
arrayIndex, | |
priority, | |
direct, | |
) | |
raise ExecutionError(errorClass="property", errorCode="writeAccessDenied") | |
@bacpypes_debugging | |
@register_object_type | |
class JSONAnalogValueObject(AnalogValueObject): | |
properties = [ | |
JSONValueProperty(Real), | |
] | |
def __init__(self, **kwargs): | |
if _debug: | |
JSONAnalogValueObject._debug("__init__ %r", kwargs) | |
AnalogValueObject.__init__(self, **kwargs) | |
@bacpypes_debugging | |
@register_object_type | |
class JSONBinaryValueObject(BinaryValueObject): | |
properties = [ | |
JSONValueProperty(BinaryPV), | |
] | |
def __init__(self, **kwargs): | |
if _debug: | |
JSONBinaryValueObject._debug("__init__ %r", kwargs) | |
BinaryValueObject.__init__(self, **kwargs) | |
# | |
# __main__ | |
# | |
def main(): | |
# parse the command line arguments | |
args = ConfigArgumentParser(description=__doc__).parse_args() | |
if _debug: | |
_log.debug("initialization") | |
if _debug: | |
_log.debug(" - args: %r", args) | |
# make a device object | |
this_device = LocalDeviceObject(ini=args.ini) | |
if _debug: | |
_log.debug(" - this_device: %r", this_device) | |
# make a sample application | |
this_application = BIPSimpleApplication(this_device, args.ini.address) | |
# add the capability to server file content | |
this_application.add_capability(FileServices) | |
# make a JSON stream access file, add to the device | |
json_config_file = JSONStreamFile( | |
objectIdentifier=("file", 1), objectName="JSON Config" | |
) | |
_log.debug(" - json_config_file: %r", json_config_file) | |
this_application.add_object(json_config_file) | |
javo = JSONAnalogValueObject( | |
objectIdentifier=("analogValue", 1), | |
objectName="avar", | |
) | |
_log.debug(" - javo: %r", javo) | |
this_application.add_object(javo) | |
jbvo = JSONBinaryValueObject( | |
objectIdentifier=("binaryValue", 1), | |
objectName="bvar", | |
) | |
_log.debug(" - jbvo: %r", jbvo) | |
this_application.add_object(jbvo) | |
_log.debug("running") | |
run() | |
_log.debug("fini") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment