Skip to content

Instantly share code, notes, and snippets.

@sourceperl
Last active November 4, 2016 17:31
Show Gist options
  • Save sourceperl/53226674bbb423c83635 to your computer and use it in GitHub Desktop.
Save sourceperl/53226674bbb423c83635 to your computer and use it in GitHub Desktop.
RPISIGFOX board with PLC modbus export (+ min/max/avg stats)
[program:sigfox]
command=sigfox_xxxxx.py
user=pi
directory=/home/pi/
autostart=true
autorestart=false
environment=PYTHONUNBUFFERED=1
#!/usr/bin/env python3
import time
import serial
import schedule
import logging
import logging.handlers
from pyModbusTCP.client import ModbusClient
# global vars
modbus = {'io_amont': 0, 'anc_io_amt': 0, 'io_aval': 0, 'anc_io_aval': 0, 'q_inst': 0, 'consigne_io': 0}
l_io_amt = []
l_io_avl = []
l_q_inst = []
# jobs manages by schedule
def modbus_job():
global modbus
# do modbus read
val = c.read_holding_registers(27000, 6)
# if ok, update
if val:
# update value with a high limit
modbus['io_amont'] = min(val[0], 250)
modbus['anc_io_amt'] = min(val[1], 999)
modbus['io_aval'] = min(val[2], 250)
modbus['anc_io_aval'] = min(val[3], 999)
modbus['q_inst'] = min(val[4], 2000)
modbus['consigne_io'] = min(val[5], 250)
else:
logging.debug('modbus read error')
def stat_job():
global modbus, l_io_amt, l_io_avl, l_q_inst
l_io_amt.append(modbus['io_amont'])
l_io_avl.append(modbus['io_aval'])
l_q_inst.append(modbus['q_inst'])
def check_modem_job():
ser.flushInput()
ser.timeout = 1
ser.write(b'AT\r\n')
while True:
s = ser.readline()
if not s:
logger.debug('UNB modem test: timeout during modem check')
break
elif s.find(b'OK\r\n') != -1:
logger.debug('UNB modem test: modem ready')
break
elif s.find(b'ERROR\r\n') != -1:
logger.debug('UNB modem test: modem KO')
break
# elif s.find(b'Software Version:') != -1:
# release = s.split()[2].decode()
# logger.debug('soft=%s' % release)
def send_sigfox_job():
global modbus, l_io_amt, l_io_avl, l_q_inst
# do stat stuff
io_amt_avg = round(sum(l_io_amt) / len(l_io_amt))
io_avl_avg = round(sum(l_io_avl) / len(l_io_avl))
io_avl_max = max(l_io_avl)
io_avl_min = min(l_io_avl)
q_inst_max = max(l_q_inst)
q_inst_min = min(l_q_inst)
q_inst_avg = round(sum(l_q_inst) / len(l_q_inst))
consigne_io = modbus['consigne_io']
# clear all
l_io_amt.clear()
l_io_avl.clear()
l_q_inst.clear()
# format message
payload_fmt = '{:02x}{:02x}{:02x}{:02x}{:02x}{:04x}{:04x}{:04x}{:02x}'
payload_var = (0, io_amt_avg, io_avl_avg, io_avl_max, io_avl_min, q_inst_avg, q_inst_max, q_inst_min, consigne_io)
payload_str = 'AT$SS=' + payload_fmt + '\r\n'
at_send_str = payload_str.format(*payload_var).encode()
# send message
ser.flushInput()
ser.timeout = 15
ser.write(at_send_str)
while True:
s = ser.readline()
if not s:
logger.info('Sigfox SEND: timeout after SEND cmd')
break
elif s.find(b'OK\r\n') != -1:
logger.info('Sigfox SEND: OK')
break
elif s.find(b'ERROR\r\n') != -1:
logger.info('Sigfox SEND: ERROR')
break
if __name__ == '__main__':
# init syslog
# set log option
logging.getLogger('schedule').setLevel(logging.WARNING)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.handlers.SysLogHandler(address='/dev/log')
handler.setFormatter(logging.Formatter('%(module)s: %(message)s'))
logger.addHandler(handler)
# log startup
logging.info("start")
# init modbus client
c = ModbusClient(host='192.168.0.200', auto_open=True)
# init serial to UNB modem
ser = serial.Serial(
port='/dev/ttyAMA0',
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
# schedule conf.
schedule.every(5).seconds.do(modbus_job)
schedule.every(1).minute.do(stat_job)
schedule.every(30).minutes.do(check_modem_job)
schedule.every(1).hour.do(send_sigfox_job)
# main loop
while True:
schedule.run_pending()
time.sleep(.5)
#!/usr/bin/env python3
import time
import serial
import schedule
import logging
import logging.handlers
import redis
# custom redis
class CustomRedis(redis.StrictRedis):
def get_float(self, key, f_min=None, f_max=None, f_default=0.0):
try:
b_str = self.get(key)
# if key not exist
if b_str is None:
return f_default
# str to float with min/max range
f = float(b_str)
if f_min is not None:
f = max(f, f_min)
if f_max is not None:
f = min(f, f_max)
return f
except redis.RedisError:
logging.debug('redis I/O error')
except ValueError:
logging.debug('redis format error')
return f_default
# global vars
since_tx = 0
ext_vars = {'io': 0.0, 'io_anc': 0.0}
l_io = []
# jobs manages by schedule
def redis_job():
global ext_vars, since_tx
# read float keys
ext_vars['io'] = r.get_float('cp4900:tht', f_min=0.0, f_max=250.0, f_default=0.0)
ext_vars['io_anc'] = r.get_float('cp4900:age', f_min=0.0, f_max=65535.0, f_default=65535.0)
# update since indicator
since_tx += 1
try:
r.set('rpi_sigfox:since_tx', since_tx, ex=10)
except redis.RedisError:
logging.debug('redis I/O error')
def stat_job():
global l_io
l_io.append(ext_vars['io'])
def check_modem_job():
ser.flushInput()
ser.timeout = 1
ser.write(b'AT\r\n')
while True:
s = ser.readline()
if not s:
logger.debug('UNB modem test: timeout during modem check')
break
elif s.find(b'OK\r\n') != -1:
logger.debug('UNB modem test: modem ready')
break
elif s.find(b'ERROR\r\n') != -1:
logger.debug('UNB modem test: modem KO')
break
def send_sigfox_job():
global l_io, since_tx
# do stat stuff
io_avg = round(sum(l_io) / len(l_io))
io_max = int(max(l_io))
io_min = int(min(l_io))
io_anc = int(ext_vars['io_anc'])
# clear all
l_io.clear()
# format message
payload_fmt = '{:02x}{:02x}{:02x}{:02x}{:04x}'
payload_var = (0, io_avg, io_max, io_min, io_anc)
payload_str = 'AT$SS=' + payload_fmt + '\r\n'
at_send_str = payload_str.format(*payload_var).encode()
# send message
ser.flushInput()
ser.timeout = 15
ser.write(at_send_str)
while True:
s = ser.readline()
if not s:
logger.info('Sigfox SEND: timeout after SEND cmd')
break
elif s.find(b'OK\r\n') != -1:
logger.info('Sigfox SEND: OK')
break
elif s.find(b'ERROR\r\n') != -1:
logger.info('Sigfox SEND: ERROR')
break
# reset send
since_tx = 0
if __name__ == '__main__':
# init syslog
# set log option
logging.getLogger('schedule').setLevel(logging.WARNING)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.handlers.SysLogHandler(address='/dev/log')
handler.setFormatter(logging.Formatter('%(module)s: %(message)s'))
logger.addHandler(handler)
# log startup
logging.info("start")
# init redis client and redis vars
r = CustomRedis(host='192.168.0.30')
# init serial to UNB modem
ser = serial.Serial(
port='/dev/ttyAMA0',
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
# schedule conf.
schedule.every(1).second.do(redis_job)
schedule.every(1).minute.do(stat_job)
schedule.every(30).minutes.do(check_modem_job)
schedule.every(15).minutes.do(send_sigfox_job)
# main loop
while True:
schedule.run_pending()
time.sleep(.5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment