-
-
Save OrangeTux/4683870 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Initializer: | |
""" Initializes this demo | |
""" | |
def __init__(self): | |
""" Create a Messenger and pass this messenger | |
to a new formed Sensor. | |
""" | |
self.messenger = create_messenger() | |
self.sensor = Sensor(self.messenger) | |
def create_messenger(): | |
""" Returns a messenger instance | |
""" | |
pass | |
class Sensor: | |
""" Represents a sensor out in the field. | |
""" | |
def __init__(self, messenger): | |
self.messenger = messenger | |
self.create_request() | |
self.run() | |
def create_request(self): | |
""" Create reqeust to send to the fysical sensor in the field. | |
""" | |
try: | |
addr, obj_type, obj_inst, prop_id = args[:4] | |
if not get_object_class(obj_type): | |
raise Exception, 'probleem' | |
obj_inst = int(obj_inst) | |
datatype = get_datatype(obj_type, prop_id) | |
if not datatype: | |
raise ValueError, "invalid property for object type" | |
# build a request | |
self.request = ReadPropertyRequest( | |
objectIdentifier=(obj_type, obj_inst), | |
propertyIdentifier=prop_id, | |
) | |
self.request.pduDestination = Address(self.bacnet_address) | |
if len(args) == 5: | |
self.request.propertyArrayIndex = int(args[4]) | |
except Exception, e: | |
print(e) | |
def get_data(self): | |
""" Retrieves a value out of the sensor | |
and adds it to the list of values. | |
Returns false when something failed during | |
retrieving the value. | |
""" | |
log.debug('Sensor %d at %s is tries to get data.', self.id, self.bacnet_address) | |
# Passes the request(envelope) to the Messengers request method. | |
# The Messenger will bring the envelope to the right address | |
# and returns the response. | |
self.messenger.run() | |
response = self.messenger.request(self.request) | |
# TODO extract data we need out of the response | |
return data | |
def run(self): | |
""" This method continuesly retrieves a new | |
value of the sensor, calculates the average | |
and write the result to the database. | |
""" | |
while True: | |
# Check if a new value has been retrieved out of the sensor. | |
if self.get_data(): | |
self.calculate_average() | |
self.write_to_db() | |
# Sleep for a few second before do it all again. | |
time.sleep(self.CHECK_INTERVAL) | |
class Messenger(BIPSimpleApplication): | |
""" | |
This class functions as a messenger between a Sensor (instance) | |
and a fysical sensor. This is an implementation of the BIPSimpleApplication | |
which is in turn extends Application. | |
""" | |
def __init__(self, device, address): | |
BIPSimpleApplication.__init__(self, device, address) | |
# Create a task manager where all calls/requests to | |
# a bacnet device are put on. | |
self.taskManager = TaskManager() | |
def confirmation(self, apdu): | |
""" This method returns the response of a sensor. | |
""" | |
if isinstance(apdu, AbortPDU): | |
apdu.debug_contents() | |
def run(self): | |
""" Executes the next task. Normally this task | |
is a request for data. | |
Copied out of bacpypes.core. | |
""" | |
running = False | |
taskManager = None | |
deferredFns = [] | |
sleeptime = 0.0 | |
SPIN = 1.0 | |
running = True | |
try: | |
task, delta = self.taskManager.get_next_task() | |
# if there is a task to process, do it | |
if task: | |
print('got a task') | |
# _log.debug("task: %r", task) | |
self.taskManager.process_task(task) | |
# if delta is None, there are no tasks, default to spinning | |
if delta is None: | |
delta = SPIN | |
# there may be threads around, sleep for a bit | |
if sleeptime and (delta > sleeptime): | |
time.sleep(sleeptime) | |
delta -= sleeptime | |
# if there are deferred functions, use a small delta | |
if deferredFns: | |
delta = min(delta, 0.001) | |
# | |
# loop for socket activity | |
asyncore.loop(timeout=delta, count=1) | |
# check for deferred functions | |
while deferredFns: | |
# get a reference to the list | |
fnlist = deferredFns | |
deferredFns = [] | |
# call the functions | |
for fn, args, kwargs in fnlist: | |
# _log.debug("call: %r %r %r", fn, args, kwargs) | |
fn( *args, **kwargs) | |
# done with this list | |
del fnlist | |
except KeyboardInterrupt: | |
_log.info("keyboard interrupt") | |
running = False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment