Created
October 18, 2016 08:56
-
-
Save sourceperl/40b87af0f3ef1911a446d2167aadb401 to your computer and use it in GitHub Desktop.
CP modbus data to redis DB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# retrieve modbus data from CP4900 device and store it to a redis DB | |
from pyHMI.DS_ModbusTCP import ModbusTCPDevice | |
from pyHMI.Tag import Tag | |
import redis | |
import time | |
class Devices(object): | |
# init datasource | |
# PLC TBox | |
plc = ModbusTCPDevice('localhost', port=502, timeout=2.0, refresh=1.0) | |
# init modbus tables | |
plc.add_bits_table(0, 4) | |
plc.add_words_table(0, 4) | |
class Tags(object): | |
# tags list | |
# from PLC | |
BIT_0 = Tag(False, src=Devices.plc, ref={'type': 'bit', 'addr': 0}) | |
BIT_1 = Tag(False, src=Devices.plc, ref={'type': 'bit', 'addr': 1}) | |
BIT_2 = Tag(False, src=Devices.plc, ref={'type': 'bit', 'addr': 2}) | |
BIT_3 = Tag(False, src=Devices.plc, ref={'type': 'bit', 'addr': 3}) | |
WORD_0 = Tag(0, src=Devices.plc, ref={'type': 'word', 'addr': 0}) | |
# virtual | |
MDV = Tag(0) | |
@classmethod | |
def update_tags(cls): | |
# update tags | |
cls.MDV.val += 1 | |
class Job(object): | |
def __init__(self, do_cmd, every_ms=500): | |
self.cmd = do_cmd | |
self.every_s = every_ms / 1000.0 | |
self.last_do = 0.0 | |
class MainApp(object): | |
def __init__(self): | |
# jobs | |
self.jobs = [] | |
# redis | |
self.redis = redis.Redis('localhost') | |
# periodic update tags | |
self.do_every(Tags.update_tags, every_ms=500) | |
self.do_every(self.update_redis, every_ms=500) | |
def mainloop(self): | |
# basic scheduler (replace tk after) | |
while True: | |
for job in self.jobs: | |
if job.every_s <= job.last_do: | |
job.cmd() | |
job.last_do = 0.0 | |
job.last_do += 0.1 | |
time.sleep(.1) | |
def do_every(self, do_cmd, every_ms=1000): | |
# store jobs | |
self.jobs.append(Job(do_cmd=do_cmd, every_ms=every_ms)) | |
# first launch | |
do_cmd() | |
def update_redis(self): | |
self.redis.set('alive', Tags.MDV.val) | |
self.redis.set('word0', Tags.WORD_0.val) | |
pass | |
if __name__ == '__main__': | |
# main App | |
main_app = MainApp() | |
main_app.mainloop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment