Skip to content

Instantly share code, notes, and snippets.

@bilinin
Last active October 6, 2016 16:02
Show Gist options
  • Save bilinin/e67e9d227a9192bcd2e6515db714ac86 to your computer and use it in GitHub Desktop.
Save bilinin/e67e9d227a9192bcd2e6515db714ac86 to your computer and use it in GitHub Desktop.
import random
import cgi
import binascii
import time
from datetime import datetime
from crccheck.crc import CrcModbus
from socket import *
import pickle
counter_merc = {
'sn': '',
'energy': ''}
commands = {"merc": "00 00 01 b0",
"start": "FF FF FF FF FF FF FF FF FF FF FF FF FF",
"vkt-7": "FF FF 00 10 3F FF 00 00 CC 80 00 00 00 64 54",
"spt943": "3F 00 00 00 00 BF 16",
"speed": "10 01 42 02 00 00 00 ba 16",
"tt1": "10 00 90 01 00 05 00 3F 00 00 00 00 8B 31 16",
"pr1": "10 01 90 19 00 05 00 3f 00 00 00 00 29 79"}
obj = {}
########## EXCEL
import xlrd
import xlutils.copy
def _getOutCell(outSheet, colIndex, rowIndex):
""" HACK: Extract the internal xlwt cell representation. """
row = outSheet._Worksheet__rows.get(rowIndex)
if not row: return None
cell = row._Row__cells.get(colIndex)
return cell
def setOutCell(outSheet, col, row, value):
""" Change cell value without changing formatting. """
# HACK to retain cell style.
previousCell = _getOutCell(outSheet, col, row)
# END HACK, PART I
outSheet.write(row, col, value)
# HACK, PART II
if previousCell:
newCell = _getOutCell(outSheet, col, row)
if newCell:
newCell.xf_idx = previousCell.xf_idx
# END HACK
def make_report():
offset = 9
inBook = xlrd.open_workbook('Report230.xls', formatting_info=True)
outBook = xlutils.copy.copy(inBook)
outSheet = outBook.get_sheet(0)
i =0
for current in bd:
setOutCell(outSheet , 1, i+offset, bd[current]['label'])
setOutCell(outSheet, 3, i + offset, int(bd[current]['nt']))
setOutCell(outSheet , 6, i+offset, bd[current]['energy'])
i += 1
outBook.save('output.xls')
########## EXCEL
def read_bd():
# Чтение структкры из файла
input = open('cgi-bin/data.pkl', 'rb')
# input = open('data.pkl', 'rb')
base_counters = pickle.load(input)
input.close()
return base_counters
def save_bd(base_counters):
# Консервация в файл
output = open('cgi-bin/data.pkl', 'wb')
pickle.dump(base_counters, output, 4)
output.close()
def pop_bd(id):
base_counters = read_bd()
base_counters.pop(id)
save_bd(base_counters)
def get_port_access(HOST, PORT):
ADDR = (HOST, PORT)
client = socket(AF_INET, SOCK_STREAM)
client.settimeout(3)
try:
client.connect(ADDR)
data = 'Подключено'
client.close()
except ConnectionRefusedError:
data = "Ошибка недоступен порт " + str(PORT)
log_this(HOST + " недоступен порт " + str(PORT) + " \n")
except timeout:
data = "Не подключено, недоступен порт " + str(PORT)
log_this(HOST + " недоступен порт " + str(PORT) + " \n")
return data
def byte_command(command):
return bytearray.fromhex(commands[command])
def get_merc_info(HOST, PORT, NT):
get = get_(NT)
ADDR = (HOST, PORT)
BUFSIZE = 100 # Размер буфера
client = socket(AF_INET, SOCK_STREAM)
client.settimeout(3)
try:
client.connect(ADDR)
client.send(get) # Меркурий 00 00 01 B0
data = client.recv(BUFSIZE)
data = binascii.hexlify(bytearray(data))
client.close()
except ConnectionRefusedError:
data = "Ошибка"
log_this(HOST + " недоступен счётчик\n")
except timeout:
data = "недоступен"
log_this(HOST + " недоступен счётчик\n")
return data
def get_from_count(HOST, PORT, command):
ADDR = (HOST, PORT)
BUFSIZE = 100 # Размер буфера
client = socket(AF_INET, SOCK_STREAM)
client.settimeout(3)
try:
client.connect(ADDR)
client.send(bytearray.fromhex("FF FF FF FF FF FF FF FF FF FF FF FF FF FF FF FF"))
time.sleep(2)
client.send(bytearray.fromhex("01 3F 00 00 00 00 BF 16"))
data = client.recv(BUFSIZE)
#data = binascii.hexlify(bytearray(data))
client.close()
except ConnectionRefusedError:
data = "Ошибка"
log_this(HOST + " недоступен счётчик\n")
except timeout:
data = "Счётчик недоступен"
log_this(HOST + " недоступен счётчик\n")
return data
def get_from_counter(HOST, PORT, send_data):
ADDR = (HOST, PORT)
BUFSIZE = 100 # Размер буфера
client = socket(AF_INET, SOCK_STREAM)
client.settimeout(3)
try:
client.connect(ADDR)
client.send(send_data)
data = client.recv(BUFSIZE)
# data = binascii.hexlify(bytearray(data))
client.close()
except ConnectionRefusedError:
data = "Ошибка"
log_this(HOST + " недоступен счётчик\n")
except timeout:
data = "Счётчик недоступен"
log_this(HOST + " недоступен счётчик\n")
return data
def log_this(what):
f = open('log.txt', 'a')
f.write(what)
f.close()
def get_(this): # Quick calculation
if this > 10:
data = str(hex(this)[2:]) + "00"
else:
data = "0" + str(hex(this)[2:]) + "00"
data_byte = bytearray.fromhex(data)
crc = CrcModbus.calchex(data_byte)
crc_byte = bytearray.fromhex(crc)
crc_byte.reverse()
return data_byte + crc_byte
def crc_merc(data): # Quick calculation
data_byte = bytearray.fromhex(data)
crc = CrcModbus.calchex(data_byte)
crc_byte = bytearray.fromhex(crc)
crc_byte.reverse()
return data_byte + crc_byte
def add_cs_spt(data, NT):
NT = hex_from_int(int(NT))
data_byte = bytearray.fromhex(NT + data)
byte_sum = 0
for by in data_byte:
byte_sum += by
result = hex(0xFF ^ byte_sum)
result = result[len(result) - 2:len(result)]
result = "10 " + NT + " " + data + " " + result + " 16"
return bytearray.fromhex(result)
def cut_bytes(data, bgn, nd):
res = ''
for (i, current) in enumerate(data):
if (i >= bgn) and ((i <= nd)):
res += str(current)
return res
def put_to_base(id, base_counters):
id = int(id)
if base_counters[id]['counter_type'] == "spt943":
res = get_from_count(base_counters[id]['ip'], int(base_counters[id]['port']), bytearray.fromhex('FF FF FF FF FF FF FF FF FF FF FF FF FF FF FF FF')) # ????????
res = get_from_count(base_counters[id]['ip'], int(base_counters[id]['port']), add_cs_spt("3F 00 00 00 00 BF 16", base_counters[id]['nt']))
print(res)
elif base_counters[id]['counter_type'] == "merc":
try:
IP = base_counters[id]['ip']
PORT = int(base_counters[id]['port'])
hex_from_int(int(base_counters[id]['nt']))
nt = hex_from_int(int(base_counters[id]['nt']))
now_time = datetime.now()
res = get_from_counter(IP, PORT, crc_merc(nt+" 01 01 01 01 01 01 01 01"))
base_counters[id]['sn'] = get_from_counter(IP, PORT,
crc_merc("00 08 00"))
energy = get_from_counter(IP, PORT, crc_merc(nt+" 05 00 00"))
energy = energy[1:5]
base_counters[id]['energy'] = str(get_merc_energy(energy,100))
voltage = get_from_counter(IP, PORT, crc_merc(nt+" 08 11 11"))[2:4]
base_counters[id]['voltage1'] = str(get_merc_energy(voltage,100))
voltage = get_from_counter(IP, PORT, crc_merc(nt+" 08 11 12"))[2:4]
base_counters[id]['voltage2'] = str(get_merc_energy(voltage,100))
voltage = get_from_counter(IP, PORT, crc_merc(nt+" 08 11 13"))[2:4]
base_counters[id]['voltage3'] = str(get_merc_energy(voltage,100))
current = get_from_counter(IP, PORT, crc_merc(nt+" 08 11 21"))[2:4]
base_counters[id]['current1'] = str(get_merc_energy(current,1000))
current = get_from_counter(IP, PORT, crc_merc(nt+" 08 11 22"))[2:4]
base_counters[id]['current2'] = str(get_merc_energy(current,1000))
current = get_from_counter(IP, PORT, crc_merc(nt+" 08 11 23"))[2:4]
base_counters[id]['current3'] = str(get_merc_energy(current,1000))
base_counters[id]['time'] = str(now_time.strftime("%Y-%m-%d %H:%M:%S"))
base_counters[id]['owner']= 'good'
except TypeError:
base_counters[id]['owner'] = 'bad'
print("Счётчик недоступен")
save_bd(base_counters)
def get():
if counter_type == "spt943":
res = get_from_counter(ip, int(port), add_cs_spt('spt943', nt))
elif counter_type == "merc":
res = get_from_counter(ip, int(port), crc_merc("00 01 01 01 01 01 01 01 01"))
counter_merc['sn'] = get_from_counter(ip, int(port), crc_merc("00 08 00"))
counter_merc['sn'] = binascii.hexlify(bytearray(counter_merc['sn']))
counter_merc['energy'] = get_from_counter(ip, int(port), crc_merc("00 05 00 00"))
counter_merc['energy'] = binascii.hexlify(bytearray(counter_merc['energy']))
def get_merc_energy(original,divider):
norm = bytes([c for t in zip(original[1::2], original[::2]) for c in t])
return (int.from_bytes(norm, byteorder='big') / divider)
def hex_from_int(NT):
if NT < 16:
NT = '0' + hex(int(NT))[2:]
else:
NT = hex(int(NT))[2:]
return NT
def print_list_counters(base_counters):
print("""
<table border=1>Считанные данные
<tr>
<td>Объект</td>
<td>Тип счётчика</td>
<td>Энергия кВтч</td>
<td>V фаза 1</td>
<td>V фаза 2</td>
<td>V фаза 3</td>
<td>I фаза 1</td>
<td>I фаза 2</td>
<td>I фаза 3</td>
<td>Время опроса</td>
<td>Состояние</td>
<td>ОПРОС</td>
<td>УДАЛЕНИЕ</td>
</tr>
""")
for current in base_counters:
try:
if base_counters[current]['owner'] == 'bad':
print("<tr bgcolor ='red'>")
else:
print("<tr>")
print("""
<td>""" + base_counters[current]['label'] + """</td>
<td>""" + base_counters[current]['counter_type'] + """</td>
<td>""" + base_counters[current]['energy'] + """</td>
<td>""" + base_counters[current]['voltage1'] + """</td>
<td>""" + base_counters[current]['voltage2'] + """</td>
<td>""" + base_counters[current]['voltage3'] + """</td>
<td>""" + base_counters[current]['current1'] + """</td>
<td>""" + base_counters[current]['current2'] + """</td>
<td>""" + base_counters[current]['current3'] + """</td>
<td>""" + base_counters[current]['time'] + """</td>
<td>""" + base_counters[current]['owner'] + """</td>
<td><a href='/cgi-bin/monitor.py?action=opros&id=""" + str(current) + """'> ОПРОСИТЬ<a/></td>
<td><a href='/cgi-bin/monitor.py?action=del&id=""" + str(current) + """'> УДАЛИТЬ <BR><a/></td>
</tr>
""")
print("")
except KeyError:
print("Запись удалена<BR>")
print("</table>")
def add(base_counters):
next_ = random.randint(1, 9999999999)
print("следующая запись" + str(next_) + "<BR> ")
next_counter = {next_: {'label': label,
'ip': ip,
'port': port,
'counter_type': counter_type,
'nt': nt,
'sn': 'Нет данных',
'energy': 'Нет данных',
'voltage1': 'Нет данных',
'voltage2': 'Нет данных',
'voltage3': 'Нет данных',
'current1': 'Нет данных',
'current2': 'Нет данных',
'current3': 'Нет данных',
'time': 'Нет данных',
'owner':'Нет данных'
}}
base_counters.update(next_counter)
save_bd(base_counters)
form = cgi.FieldStorage()
ip = form.getfirst("ip", "none")
port = form.getfirst("port", "4001")
counter_type = form.getfirst("counter_type", "none")
nt = form.getfirst("nt", "none")
label = form.getfirst("label", "none")
action = form.getfirst("action", "none")
id = form.getfirst("id", "none")
print("Content-type: text/html\n")
def add_new():
print("""<!DOCTYPE HTML>
<html>
<head>
<meta charset="utf-8">
<title>Проверка счётчиков</title>
</head>
<body>
<center>
<H1>Проверка приборов учёта</H1><BR>
<form action="/cgi-bin/monitor.py">
<input type="hidden" name="action" value="add">
Добавление нового счётчика
<table border=1>
<tr>
<td>Адрес:
<input type="text" name="label" value='""" + label + """'><BR>
</td>
<td>IP:
<input type="text" name="ip" value='""" + ip + """'><BR>
</td>
<td>
Порт счётчика:
<input type="text" name="port" value='""" + port + """'><BR>
</td>
<td align="center">
<select size="3" multiple name="counter_type">
<option disabled>Выберите тип</option>
<option value="spt943">СПТ943</option>
<option selected value="merc">Меркурий</option>
</select><BR>
</td>
<td>
Сетевой номер(NT):
<input type="text" name="nt" value='""" + str(nt) + """'><BR>
<input type="submit" value="Добавить">
</td>
</tr>
</form>""")
base_counters = read_bd()
add_new()
if action == "add":
add(base_counters)
elif action == "del":
pop_bd(int(id))
elif action == "opros":
put_to_base(int(id), base_counters)
elif action == 'report':
bd = read_bd()
make_report()
print("<a href='../output.xls'>Скачать отчёт</a>")
base_counters = read_bd()
print_list_counters(base_counters)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment