Skip to content

Instantly share code, notes, and snippets.

@sourceperl
Created October 18, 2016 08:56
Show Gist options
  • Save sourceperl/40b87af0f3ef1911a446d2167aadb401 to your computer and use it in GitHub Desktop.
Save sourceperl/40b87af0f3ef1911a446d2167aadb401 to your computer and use it in GitHub Desktop.
CP modbus data to redis DB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# retrieve modbus data from CP4900 device and store it to a redis DB
from pyHMI.DS_ModbusTCP import ModbusTCPDevice
from pyHMI.Tag import Tag
import redis
import time
class Devices(object):
# init datasource
# PLC TBox
plc = ModbusTCPDevice('localhost', port=502, timeout=2.0, refresh=1.0)
# init modbus tables
plc.add_bits_table(0, 4)
plc.add_words_table(0, 4)
class Tags(object):
# tags list
# from PLC
BIT_0 = Tag(False, src=Devices.plc, ref={'type': 'bit', 'addr': 0})
BIT_1 = Tag(False, src=Devices.plc, ref={'type': 'bit', 'addr': 1})
BIT_2 = Tag(False, src=Devices.plc, ref={'type': 'bit', 'addr': 2})
BIT_3 = Tag(False, src=Devices.plc, ref={'type': 'bit', 'addr': 3})
WORD_0 = Tag(0, src=Devices.plc, ref={'type': 'word', 'addr': 0})
# virtual
MDV = Tag(0)
@classmethod
def update_tags(cls):
# update tags
cls.MDV.val += 1
class Job(object):
def __init__(self, do_cmd, every_ms=500):
self.cmd = do_cmd
self.every_s = every_ms / 1000.0
self.last_do = 0.0
class MainApp(object):
def __init__(self):
# jobs
self.jobs = []
# redis
self.redis = redis.Redis('localhost')
# periodic update tags
self.do_every(Tags.update_tags, every_ms=500)
self.do_every(self.update_redis, every_ms=500)
def mainloop(self):
# basic scheduler (replace tk after)
while True:
for job in self.jobs:
if job.every_s <= job.last_do:
job.cmd()
job.last_do = 0.0
job.last_do += 0.1
time.sleep(.1)
def do_every(self, do_cmd, every_ms=1000):
# store jobs
self.jobs.append(Job(do_cmd=do_cmd, every_ms=every_ms))
# first launch
do_cmd()
def update_redis(self):
self.redis.set('alive', Tags.MDV.val)
self.redis.set('word0', Tags.WORD_0.val)
pass
if __name__ == '__main__':
# main App
main_app = MainApp()
main_app.mainloop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment