Last active
November 4, 2016 17:31
-
-
Save sourceperl/53226674bbb423c83635 to your computer and use it in GitHub Desktop.
RPISIGFOX board with PLC modbus export (+ min/max/avg stats)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[program:sigfox] | |
command=sigfox_xxxxx.py | |
user=pi | |
directory=/home/pi/ | |
autostart=true | |
autorestart=false | |
environment=PYTHONUNBUFFERED=1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import time | |
import serial | |
import schedule | |
import logging | |
import logging.handlers | |
from pyModbusTCP.client import ModbusClient | |
# global vars | |
modbus = {'io_amont': 0, 'anc_io_amt': 0, 'io_aval': 0, 'anc_io_aval': 0, 'q_inst': 0, 'consigne_io': 0} | |
l_io_amt = [] | |
l_io_avl = [] | |
l_q_inst = [] | |
# jobs manages by schedule | |
def modbus_job(): | |
global modbus | |
# do modbus read | |
val = c.read_holding_registers(27000, 6) | |
# if ok, update | |
if val: | |
# update value with a high limit | |
modbus['io_amont'] = min(val[0], 250) | |
modbus['anc_io_amt'] = min(val[1], 999) | |
modbus['io_aval'] = min(val[2], 250) | |
modbus['anc_io_aval'] = min(val[3], 999) | |
modbus['q_inst'] = min(val[4], 2000) | |
modbus['consigne_io'] = min(val[5], 250) | |
else: | |
logging.debug('modbus read error') | |
def stat_job(): | |
global modbus, l_io_amt, l_io_avl, l_q_inst | |
l_io_amt.append(modbus['io_amont']) | |
l_io_avl.append(modbus['io_aval']) | |
l_q_inst.append(modbus['q_inst']) | |
def check_modem_job(): | |
ser.flushInput() | |
ser.timeout = 1 | |
ser.write(b'AT\r\n') | |
while True: | |
s = ser.readline() | |
if not s: | |
logger.debug('UNB modem test: timeout during modem check') | |
break | |
elif s.find(b'OK\r\n') != -1: | |
logger.debug('UNB modem test: modem ready') | |
break | |
elif s.find(b'ERROR\r\n') != -1: | |
logger.debug('UNB modem test: modem KO') | |
break | |
# elif s.find(b'Software Version:') != -1: | |
# release = s.split()[2].decode() | |
# logger.debug('soft=%s' % release) | |
def send_sigfox_job(): | |
global modbus, l_io_amt, l_io_avl, l_q_inst | |
# do stat stuff | |
io_amt_avg = round(sum(l_io_amt) / len(l_io_amt)) | |
io_avl_avg = round(sum(l_io_avl) / len(l_io_avl)) | |
io_avl_max = max(l_io_avl) | |
io_avl_min = min(l_io_avl) | |
q_inst_max = max(l_q_inst) | |
q_inst_min = min(l_q_inst) | |
q_inst_avg = round(sum(l_q_inst) / len(l_q_inst)) | |
consigne_io = modbus['consigne_io'] | |
# clear all | |
l_io_amt.clear() | |
l_io_avl.clear() | |
l_q_inst.clear() | |
# format message | |
payload_fmt = '{:02x}{:02x}{:02x}{:02x}{:02x}{:04x}{:04x}{:04x}{:02x}' | |
payload_var = (0, io_amt_avg, io_avl_avg, io_avl_max, io_avl_min, q_inst_avg, q_inst_max, q_inst_min, consigne_io) | |
payload_str = 'AT$SS=' + payload_fmt + '\r\n' | |
at_send_str = payload_str.format(*payload_var).encode() | |
# send message | |
ser.flushInput() | |
ser.timeout = 15 | |
ser.write(at_send_str) | |
while True: | |
s = ser.readline() | |
if not s: | |
logger.info('Sigfox SEND: timeout after SEND cmd') | |
break | |
elif s.find(b'OK\r\n') != -1: | |
logger.info('Sigfox SEND: OK') | |
break | |
elif s.find(b'ERROR\r\n') != -1: | |
logger.info('Sigfox SEND: ERROR') | |
break | |
if __name__ == '__main__': | |
# init syslog | |
# set log option | |
logging.getLogger('schedule').setLevel(logging.WARNING) | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
handler = logging.handlers.SysLogHandler(address='/dev/log') | |
handler.setFormatter(logging.Formatter('%(module)s: %(message)s')) | |
logger.addHandler(handler) | |
# log startup | |
logging.info("start") | |
# init modbus client | |
c = ModbusClient(host='192.168.0.200', auto_open=True) | |
# init serial to UNB modem | |
ser = serial.Serial( | |
port='/dev/ttyAMA0', | |
baudrate=9600, | |
parity=serial.PARITY_NONE, | |
stopbits=serial.STOPBITS_ONE, | |
bytesize=serial.EIGHTBITS | |
) | |
# schedule conf. | |
schedule.every(5).seconds.do(modbus_job) | |
schedule.every(1).minute.do(stat_job) | |
schedule.every(30).minutes.do(check_modem_job) | |
schedule.every(1).hour.do(send_sigfox_job) | |
# main loop | |
while True: | |
schedule.run_pending() | |
time.sleep(.5) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import time | |
import serial | |
import schedule | |
import logging | |
import logging.handlers | |
import redis | |
# custom redis | |
class CustomRedis(redis.StrictRedis): | |
def get_float(self, key, f_min=None, f_max=None, f_default=0.0): | |
try: | |
b_str = self.get(key) | |
# if key not exist | |
if b_str is None: | |
return f_default | |
# str to float with min/max range | |
f = float(b_str) | |
if f_min is not None: | |
f = max(f, f_min) | |
if f_max is not None: | |
f = min(f, f_max) | |
return f | |
except redis.RedisError: | |
logging.debug('redis I/O error') | |
except ValueError: | |
logging.debug('redis format error') | |
return f_default | |
# global vars | |
since_tx = 0 | |
ext_vars = {'io': 0.0, 'io_anc': 0.0} | |
l_io = [] | |
# jobs manages by schedule | |
def redis_job(): | |
global ext_vars, since_tx | |
# read float keys | |
ext_vars['io'] = r.get_float('cp4900:tht', f_min=0.0, f_max=250.0, f_default=0.0) | |
ext_vars['io_anc'] = r.get_float('cp4900:age', f_min=0.0, f_max=65535.0, f_default=65535.0) | |
# update since indicator | |
since_tx += 1 | |
try: | |
r.set('rpi_sigfox:since_tx', since_tx, ex=10) | |
except redis.RedisError: | |
logging.debug('redis I/O error') | |
def stat_job(): | |
global l_io | |
l_io.append(ext_vars['io']) | |
def check_modem_job(): | |
ser.flushInput() | |
ser.timeout = 1 | |
ser.write(b'AT\r\n') | |
while True: | |
s = ser.readline() | |
if not s: | |
logger.debug('UNB modem test: timeout during modem check') | |
break | |
elif s.find(b'OK\r\n') != -1: | |
logger.debug('UNB modem test: modem ready') | |
break | |
elif s.find(b'ERROR\r\n') != -1: | |
logger.debug('UNB modem test: modem KO') | |
break | |
def send_sigfox_job(): | |
global l_io, since_tx | |
# do stat stuff | |
io_avg = round(sum(l_io) / len(l_io)) | |
io_max = int(max(l_io)) | |
io_min = int(min(l_io)) | |
io_anc = int(ext_vars['io_anc']) | |
# clear all | |
l_io.clear() | |
# format message | |
payload_fmt = '{:02x}{:02x}{:02x}{:02x}{:04x}' | |
payload_var = (0, io_avg, io_max, io_min, io_anc) | |
payload_str = 'AT$SS=' + payload_fmt + '\r\n' | |
at_send_str = payload_str.format(*payload_var).encode() | |
# send message | |
ser.flushInput() | |
ser.timeout = 15 | |
ser.write(at_send_str) | |
while True: | |
s = ser.readline() | |
if not s: | |
logger.info('Sigfox SEND: timeout after SEND cmd') | |
break | |
elif s.find(b'OK\r\n') != -1: | |
logger.info('Sigfox SEND: OK') | |
break | |
elif s.find(b'ERROR\r\n') != -1: | |
logger.info('Sigfox SEND: ERROR') | |
break | |
# reset send | |
since_tx = 0 | |
if __name__ == '__main__': | |
# init syslog | |
# set log option | |
logging.getLogger('schedule').setLevel(logging.WARNING) | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
handler = logging.handlers.SysLogHandler(address='/dev/log') | |
handler.setFormatter(logging.Formatter('%(module)s: %(message)s')) | |
logger.addHandler(handler) | |
# log startup | |
logging.info("start") | |
# init redis client and redis vars | |
r = CustomRedis(host='192.168.0.30') | |
# init serial to UNB modem | |
ser = serial.Serial( | |
port='/dev/ttyAMA0', | |
baudrate=9600, | |
parity=serial.PARITY_NONE, | |
stopbits=serial.STOPBITS_ONE, | |
bytesize=serial.EIGHTBITS | |
) | |
# schedule conf. | |
schedule.every(1).second.do(redis_job) | |
schedule.every(1).minute.do(stat_job) | |
schedule.every(30).minutes.do(check_modem_job) | |
schedule.every(15).minutes.do(send_sigfox_job) | |
# main loop | |
while True: | |
schedule.run_pending() | |
time.sleep(.5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment