Created
June 30, 2018 04:48
-
-
Save JoelBender/49dd7f3100fdbb1886bb463f52ceb7db to your computer and use it in GitHub Desktop.
This is a sample of what I'm considering for backing up a `LocalTrendLogObject` implementation and using for testing. While this is in-memory, it doesn't seem like a huge stretch to make a database backing store that has the same API.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" | |
""" | |
import sys | |
from bacpypes.debugging import bacpypes_debugging, ModuleLogger | |
from bacpypes.primitivedata import Boolean, Unsigned, Integer, Real, BitString, Enumerated, Date, Time | |
from bacpypes.basetypes import ErrorType, StatusFlags, LogStatus, LogRecord, LogRecordLogDatum, DateTime | |
# some debugging | |
_debug = 0 | |
_log = ModuleLogger(globals()) | |
# | |
# LocalDateTime | |
# | |
class LocalDateTime(DateTime): | |
def __init__(self): | |
DateTime.__init__(self, | |
date=Date().now().value, | |
time=Time().now().value, | |
) | |
def __str__(self): | |
date_str = str(Date(self.date)) | |
time_str = str(Time(self.time)) | |
return date_str + " " + time_str | |
# | |
# TrendLog | |
# | |
@bacpypes_debugging | |
class TrendLog: | |
def __init__(self, buffer_size=1000, stop_when_full=False): | |
if _debug: TrendLog._debug("__init__ buffer_size=%r stop_when_full=%r", buffer_size, stop_when_full) | |
# start out enabled | |
self.enable = True | |
self.buffer_size = buffer_size | |
self.stop_when_full = stop_when_full | |
# array of log records | |
self.log_records = [] | |
def record_datum(self, datum, status_flags=None): | |
if _debug: TrendLog._debug("record_datum %r status_flags=%r", datum, status_flags) | |
# make sure we are enabled | |
if not self.enable: | |
if _debug: TrendLog._debug(" - not enabled") | |
return | |
# check for buffer about to be full | |
if self.stop_when_full and len(self.log_records) == (self.buffer_size - 1): | |
if _debug: TrendLog._debug(" - stopping when full") | |
# build a log record recording the event | |
log_record = LogRecord( | |
timestamp=LocalDateTime(), | |
logDatum=LogRecordLogDatum(logStatus=[1, 0, 0]), # log disabled | |
) | |
self.log_records.append(log_record) | |
# no longer enabled | |
self.enable = False | |
return | |
# if status_flags is a string, it's the name of one of the bits | |
if isinstance(status_flags, str): | |
status_flags = StatusFlags([status_flags]).value | |
# build a log record | |
log_record = LogRecord( | |
timestamp=LocalDateTime(), | |
logDatum=datum, | |
) | |
if status_flags is not None: | |
log_record.statusFlags = status_flags | |
if _debug: TrendLog._debug(" - log_record: %r", log_record) | |
# delete the oldest record if necessary | |
if len(self.log_records) == self.buffer_size: | |
del self.log_records[0] | |
# append it to the log records | |
self.log_records.append(log_record) | |
def record_status(self, log_status, status_flags=None): | |
if _debug: TrendLog._debug("record_status %r status_flags=%r", log_status, status_flags) | |
# if log_status is a string, it's the name of one of the bits | |
if isinstance(log_status, str): | |
log_status = LogStatus([log_status]).value | |
self.record_datum( | |
LogRecordLogDatum(logStatus=log_status), | |
status_flags=status_flags, | |
) | |
def record_value(self, datatype, value, status_flags=None): | |
if _debug: TrendLog._debug("record_value %r %r status_flags=%r", datatype, value, status_flags) | |
# build an empty datum and set the appropriate value | |
datum = LogRecordLogDatum() | |
setattr(datum, datatype, value) | |
self.record_datum(datum, status_flags=status_flags) | |
def record_boolean(self, value, status_flags=None): | |
"""Record a boolean value.""" | |
# a little error checking | |
assert Boolean.is_valid(value) | |
self.record_value('booleanValue', value, status_flags) | |
def record_real(self, value, status_flags=None): | |
"""Record a real value.""" | |
# a little error checking | |
assert Real.is_valid(value) | |
self.record_value('realValue', value, status_flags) | |
def record_enumerated(self, value, status_flags=None): | |
"""Record an enumerated value. Because the primitive type | |
doesn't have any enumerations defined, the best this can do | |
is an unsigned value.""" | |
# a little error checking | |
assert Enumerated.is_valid(value) | |
self.record_value('enumValue', value, status_flags) | |
def record_unsigned(self, value, status_flags=None): | |
"""Record an unsigned value.""" | |
# a little error checking | |
assert Unsigned.is_valid(value) | |
self.record_value('unsignedValue', value, status_flags) | |
def record_signed(self, value, status_flags=None): | |
"""Record a signed (integer) value.""" | |
# a little error checking | |
assert Integer.is_valid(value) | |
self.record_value('signedValue', value, status_flags) | |
def record_bitstring(self, value, status_flags=None): | |
"""Record a bitstring value, no names, just 1's and 0's.""" | |
# a little error checking | |
assert BitString.is_valid(value) | |
self.record_value('bitstringValue', value, status_flags) | |
def record_null(self, status_flags=None): | |
"""Record a null.""" | |
self.record_value('nullValue', (), status_flags) | |
def record_failure(self, error_class, error_code, status_flags=None): | |
"""Record an error.""" | |
self.record_value('failure', | |
ErrorType(errorClass=error_class, errorCode=error_code), | |
status_flags, | |
) | |
def record_time_change(self, delta, status_flags=None): | |
"""Record a time change.""" | |
self.record_value('timeChange', delta, status_flags) | |
def record_any(self, value, status_flags=None): | |
"""Record just about anything.""" | |
self.record_value('anyValue', value, status_flags) | |
def purge(self): | |
"""Empty out the log buffer and record the purge event.""" | |
if _debug: TrendLog._debug("purge") | |
# build a log record recording the event | |
log_record = LogRecord( | |
timestamp=LocalDateTime(), | |
logDatum=LogRecordLogDatum(logStatus=[0, 1, 0]), # buffer purged | |
) | |
# log buffer is the only event | |
self.log_records = [log_record] | |
def __len__(self): | |
return len(self.log_records) | |
def debug_contents(self, indent=1, file=sys.stdout, _ids=None): | |
i = 0 | |
for log_record in self.log_records: | |
datum = log_record.logDatum | |
line = ["%s[%d]" % (" " * indent, i), log_record.timestamp] | |
for element in LogRecordLogDatum.choiceElements: | |
element_value = getattr(datum, element.name, None) | |
if element_value is not None: | |
if element.name == 'logStatus': | |
element_value = str(LogStatus(element_value)) | |
line.append(element.name) | |
line.append(element_value) | |
break | |
else: | |
line.append("?") | |
if log_record.statusFlags is not None: | |
line.append(StatusFlags(log_record.statusFlags)) | |
file.write(" ".join(str(p) for p in line) + '\n') | |
i += 1 | |
# | |
# | |
# | |
print("different kinds of values") | |
trend_log = TrendLog() | |
trend_log.record_boolean(False) | |
trend_log.record_boolean(True) | |
trend_log.record_real(73.3, [1, 0, 0, 0]) | |
trend_log.record_real(73.3, 'fault') | |
trend_log.debug_contents() | |
print("") | |
# | |
# | |
# | |
print("small buffer") | |
trend_log = TrendLog(buffer_size=5) | |
for i in range(10): | |
trend_log.record_unsigned(i) | |
trend_log.debug_contents() | |
print("") | |
# | |
# | |
# | |
print("small buffer, stop when full") | |
trend_log = TrendLog(buffer_size=5, stop_when_full=True) | |
for i in range(10): | |
trend_log.record_unsigned(i) | |
trend_log.debug_contents() | |
print("") | |
print("purge") | |
trend_log.purge() | |
trend_log.debug_contents() | |
print("") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment