Skip to content

Instantly share code, notes, and snippets.

@atemate
Last active December 8, 2015 20:31
Show Gist options
  • Save atemate/97deed98f828da01b3d9 to your computer and use it in GitHub Desktop.
Save atemate/97deed98f828da01b3d9 to your computer and use it in GitHub Desktop.
lanalan_refactoredюзн
{
"00": "Everything OK - Success",
"01": "Couldn’t poll Parent because of Timeout",
"02": "Unknown command",
"04": "Invalid S-Register",
"05": "Invalid parameter",
"06": "Recipient could not be reached",
"07": "Message was not acknowledged",
"08": "No sink known",
"09": "Address Table entry is in use and cannot be modified",
"0A": "Message could not be sent",
"0B": "Local node is not sink",
"0C": "Too many characters",
"0E": "Background Scan in Progress (Please wait and try again)",
"0F": "Fatal error initialising the network",
"10": "Error bootloading",
"12": "Fatal error initialising the stack",
"18": "Node has run out of Buffers",
"19": "Trying to write read-only register",
"1A": "Data Mode Refused by Remote Node",
"1B": "Connection Lost in Data Mode",
"1C": "Remote node is already in Data Mode",
"20": "Invalid password",
"25": "Cannot form network",
"27": "No network found",
"28": "Operation cannot be completed if node is part of a PAN",
"2C": "Error leaving the PAN",
"2D": "Error scanning for PANs",
"33": "No response from the remote bootloader",
"34": "Target did not respond during cloning",
"35": "Timeout occurred during xCASTB",
"39": "MAC Transmit Queue is Full",
"6C": "Invalid Binding Index",
"70": "Invalid Operation",
"72": "More than 10 unicast messages were in flight at the same time",
"74": "Message too long",
"80": "ZDP Invalid Request Type",
"81": "ZDP Device not Found",
"82": "ZDP Invalid Endpoint",
"83": "ZDP Not Active",
"84": "ZDP Not Supported",
"85": "ZDP Timeout",
"86": "ZDP No Match",
"87": "ZDP Table Full",
"88": "ZDP No Entry",
"89": "ZDP No Descriptor",
"91": "Operation only possible if connected to a PAN",
"93": "Node is not part of a Network",
"94": "Cannot join network",
"96": "Mobile End Device Move to new Parent Failed",
"98": "Cannot join ZigBee 2006 Network as Router",
"A1": "More than 8 broadcasts were sent within 8 seconds",
"AB": "Trying to join, but no beacons could be heard",
"AC": "Network key was sent in the clear when trying to join secured",
"AD": "Did not receive Network Key",
"AE": "No Link Key received",
"AF": "Preconfigured Key Required",
"C5": "NWK Already Present",
"C7": "NWK Table Full",
"C8": "NWK Unknown Device"
}
# -*- coding: utf-8 -*-
import datetime
from time import strftime
from argparse import ArgumentParser
import os
from lanalan_class import Lanalan
from lanalan_qtplot import HarryPlotter
def parse_args():
parser = ArgumentParser(description='Lanalan: ZigBee command-line control interface')
parser.add_argument('serial_port')
#parser.add_argument('-b', type=int, default=19200, dest='baudrate')
parser.add_argument('-c', '--command', dest='command', default="AT+SN")
parser.add_argument('-f', dest='commands_file')
parser.add_argument('-n', type=int, default=1, dest='nrepeat')
parser.add_argument('-t', type=float, default=0, dest='timeout_commands')
parser.add_argument('-w', dest='wait_for_answer', action='store_true')
parser.add_argument('-d', type=str, dest='remote_devices',
help='comma-delimited list of devices names, e.g. "???"') # TODO: default: broadcast
# TODO: ^ example of devices names
parser.add_argument('--light', action='store_true')
parser.add_argument('--temperature', action='store_true')
parser.add_argument('--save-light', action='store_true')
parser.add_argument('--save-temperature', action='store_true')
parser.add_argument('--load-light', action='store_true')
parser.add_argument('--load-temperature', action='store_true')
return parser.parse_args()
if __name__ == '__main__':
print('Welcome in Lanalan.py!')
#args = parse_args()
start_time = datetime.datetime.now()
print('[+] started at ' + strftime('%Hh%Mm_%d%B%Y'))
import pyqtgraph as pg
from time import sleep
from pyqtgraph.Qt import QtGui ,QtCore
print('1')
plotter = HarryPlotter()
print('2')
for y in range(30):
plotter.emit(QtCore.SIGNAL("trigger"), pg.ptime.time(), y)
print(y)
sleep(0.5)
#print('3')
#plotter.
'''with open('qt/data/dat', 'r') as f:
data1 = dict()
for line in f.readlines():
x, y = line.strip('\r\n\t').split()
data1[float(x)] = float(y)
'''
'''import numpy as np
import random as r
for x in range(5):
y = r.randint(1,100)
x, y = x / 100, y / 100
print(x, y)
plotter.trigger(x, y)
#plotter.trigger(x, y).emit()
print('3')
exit()
'''
'''
if args.commands_file:
print('[!] loading commands from file "{}"'.format(args.commands_file))
if os.path.exists(args.commands_file):
with open(args.commands_file, 'r') as f:
for line in f.readlines():
pass
else:
print('[-] Error: commands file "{}" not found. Exit.'.format(os.path.abspath(args.commands_file)))
exit(404)
else:
if args.load_light or args.load_temperature:
# load data from .dat-file
pass
else:
if args.light:
command = ''
elif args.temperature:
command = ''
else:
command = args.command
if not args.command:
print('[-] Error: you must specify either "--light" or "--temperature" or "-c" option. Exit.')
exit(400)
#if args.save_light or args.save_temperature:
lana = Lanalan(args.serial_port,
timeout_commands=args.timeout_commands,
wait_for_answer=args.wait_for_answer)
try:
lana = Lanalan('/dev/ttyUSB0', wait_for_answer=True)
lana.send('ATI', ntimes=9)
lana.send('AT+SN', ntimes=10)
lana.remove_file('light;temperature')
except Exception:
print('AAA!!!')
exit()
'''
# ---------------------------------------------------------------------------------------------------
'''
cmds, cmds_once = [], []
if args.commandfile:
FILE_CMD=open(args.commandfile, 'r')
#with FILE_CMD as fc:
for line in FILE_CMD:
for char in " .!/;\"'\n":
line=line.replace(char,'')
if line:
cmds.append(line)
else:
cmds.append(args.command)
print '***',cmds,'***'
if args.light:
if not args.device and not args.local:
pprint('[-] Error: please, specify device to poll data from (option -d)')
exit()
elif args.local:
cmds.append('ATS22?')
else:
cmds_once.append('ATREMS:'+args.device+',1511=1')
cmds_once.append('ATREMS:'+args.device+',183=1')
cmds.append('ATREMS:'+args.device+',22?')
elif args.temp:
if not args.device and not args.local:
pprint('[-] Error: please, specify device to poll data from (option -d)')
exit()
elif args.local:
cmds.append('ATS1F?')
else:
cmds_once.append('ATREMS:'+args.device+',15D=1')
cmds_once.append('ATREMS:'+args.device+',182=1')
cmds.append('ATREMS:'+args.device+',22?')
# http://www.wless.ru/files/ZigBee/ETRX3DVKA/TG-ETRX35xDVK-PM-012-108.pdf
# TURN ON all LEDs and supply power to temperature and light sensors: ATS18=0000000C
# Turn on local LEDs ATS18=00000000
# Turn on local LEDs ATS18=000140C0
# The ADC registers can be read on a local or remote device by using the ATS or ATREMS
#commands, but they can also be sent automatically to the network sink by using function 0110 or
#0130; consult the AT Command Manual for the details, and see section 11.6 here.
ser = initialize_serialport(SERIALPORT)
open_serialport(ser)
process_commands(ser, cmds)
close_serialport(ser)
'''
# -*- coding: utf-8 -*-
import serial
import os
import datetime
from time import sleep, strftime
from threading import Thread, Event
import sys
from glob import glob
from getpass import getuser
from json import loads
from lanalan_qtplot import HarryPlotter
from pyqtgraph.ptime import time as pyqt_time
if sys.version_info >= (3, 0):
print('python {}'.format(sys.version.split()[0]))
PYTHON_V = 3
else:
print('python {}'.format(sys.version.split()[0]))
PYTHON_V = 2
PERMISSIONS_INFO = '[!] Please, ensure that you ({}) have appropriate permissions '.format(getuser()) + \
'for reading/writing on selected device (e.g. run "sudo chmod 666 /dev/ttyUSB0")'
ERRORCODES_FILENAME = 'error_codes.json'
LOG_FILENAME = 'lanalan.log'
DIALOGUE_FILENAME = 'dialogue.log'
LIGHTDATA_FILENAME = 'light_data.dat'
TEMPERATUREDATA_FILENAME = 'temperature_data.dat'
RESPONSE_FILENAME = 'response_data.dat'
import signal
# ---------------------------------------------------------------------------------------------------
class Lanalan:
""" An interface for ZigBee coordinator (tested on Telegesis ETRX3 USB Stick device) """
def __init__(self, serialport, timeout_commands=0, wait_for_answer=False, baudrate=19200, plot=False):
"""
:param serialport: (str) name of serial port where ETRX3 USB Stick is connected to
:param timeout_commands: timeout between user commands
:param wait_for_answer: if True, thread waits until "OK" recieved, if False - send all commands simultaneously
:param baudrate: default value for Zigbee devices: 19200.
:param plot: mode for qt plot
"""
# TODO: describe plot
self.activated = False
try:
assert os.path.exists(serialport), 'serialport "{}" not found'.format(serialport)
except AssertionError as e:
print('[-] Error: {}'.format(str(e))) # os.system('sudo chmod blablabla') or just print this.
print('[.] Found devices: {}'.format(', '.join(
glob('/dev/ttyUSB*')
)))
print(PERMISSIONS_INFO)
exit(2)
assert str(baudrate).isdigit() and int(baudrate) > 0, "baudrate must be positive integer"
self.serial = None # Serial object
self.serialport = serialport
self.wait_for_answer = wait_for_answer
self.baudrate = baudrate
self.timeout_commands = timeout_commands
self.need_to_plot = plot
self.datadir_name = strftime('%d%b%Y_%Hh%Mm%Ss')
if not os.path.exists(self.datadir_name):
os.mkdir(self.datadir_name)
try:
self.light_filename = self.datadir_name + '/' + LIGHTDATA_FILENAME
self.light_file = open(self.light_filename, 'w')
self.temperature_filename = self.datadir_name + '/' + TEMPERATUREDATA_FILENAME
self.temperature_file = open(self.temperature_filename, 'w')
self.dialogue_filename = self.datadir_name + '/' + DIALOGUE_FILENAME
self.dialogue_file = open(self.dialogue_filename, 'w')
self.response_filename = self.datadir_name + '/' + RESPONSE_FILENAME
self.response_file = open(self.response_filename, 'w')
self.log_filename = self.datadir_name + '/' + LOG_FILENAME
self.log_file = open(self.log_filename, 'w')
except Exception as e:
self.__print(e)
exit(3)
self.__activate()
def __activate(self):
try:
print('... loading error codes from "{}"'.format(ERRORCODES_FILENAME))
self.error_codes = loads(open(ERRORCODES_FILENAME, 'r').read())
except FileNotFoundError:
print('[-] Could not load error codes: the file "{}" not found. Continuing.'.format(
os.path.abspath(ERRORCODES_FILENAME)))
self.__define_serialport()
self.__open_serialport()
#######################################
if self.need_to_plot:
self.plotter = HarryPlotter()
else:
self.plotter = None
#######################################
self.event_may_send_command = Event()
self.event_may_send_command.set()
self.main_loop_thread = Thread(target=self.do_loop, name='reading_loop')
self.main_loop_thread.daemon = True
self.main_loop_thread.start()
# self.main_loop_thread.join(1)
self.__print('[+] listening serial port started ...')
print('-' * 100)
self.activated = True
# -- cleaning block ------------------------------------------------------------------------------------------------
def __del__(self):
if self.activated:
if self.serial:
if self.serial.isOpen():
self.serial.close()
print('-' * 100)
print('[+] serial port {} closed'.format(self.serialport))
if self.light_file:
# if not self.light_file.closed:
self.light_file.close() # TODO: check if there is really no exception
if self.temperature_file:
self.temperature_file.close()
# TODO!!! process the situation when USB suddenly unplugged (check every read time)
if self.log_file:
self.log_file.close()
if self.dialogue_file:
self.dialogue_file.close()
if self.response_file:
self.response_file.close()
def remove_file(self, filetype):
try:
if 'light' in filetype:
self.light_file.close()
os.remove(self.light_filename)
if 'temperature' in filetype:
self.temperature_file.close()
os.remove(self.temperature_filename)
except OSError:
pass
# -- printing block ------------------------------------------------------------------------------------------------
def __delta_time(self):
return str(datetime.datetime.now() - self.start_time)
def __write(self, s, filetype=''):
if filetype == 'light':
self.light_file.write(str(self.__delta_time()) + ' ' + s + '\n')
elif filetype == 'temp':
self.temperature_file.write(str(self.__delta_time()) + ' ' + s + '\n')
def __print(self, s):
if self.log_file:
if not self.log_file.closed:
self.log_file.write(s + '\n')
print(s)
def __dump(self, delta_time, response):
if self.dialogue_file:
if not self.dialogue_file.closed:
self.dialogue_file.write('{} | {}\n'.format(delta_time, response))
# -- initializing block --------------------------------------------------------------------------------------------
def __define_serialport(self):
try:
self.serial = serial.Serial(port=self.serialport, baudrate=self.baudrate)
self.serial.bytesize = serial.EIGHTBITS # number of bits per bytes
self.serial.parity = serial.PARITY_NONE # set parity check: no parity
self.serial.stopbits = serial.STOPBITS_ONE # number of stop bits
self.serial.timeout = None # timeout block read (None for block read, 0 for non-block read)
# self.serial.xonxoff = False # disable software flow control
# self.serial.rtscts = False # disable hardware (RTS/CTS) flow control
# self.serial.dsrdtr = False # disable hardware (DSR/DTR) flow control
self.serial.writeTimeout = 0 # timeout for write
except Exception as e:
self.__print('[-] Error while opening serial port: ' + str(e))
self.__print(PERMISSIONS_INFO)
exit(2)
def __open_serialport(self):
if not self.serial:
self.__define_serialport()
try:
self.__print('... trying to open serial port' + self.serialport)
self.serial.open()
except Exception as e: # TODO!!!! do right things on right exceptions!!!!!!!!
if 'Port is already open'.lower() not in str(e).lower():
self.__print('[-] Error while opening serial port: ' + str(e))
exit(3)
self.__print('[+] connected to ' + self.serialport)
self.start_time = datetime.datetime.now()
self.__print('[+] zero time: {}'.format(self.start_time))
return 0
# -- processing block ----------------------------------------------------------------------------------------------
def do_loop(self):
while True:
# signal.pause()
try:
"""response = ''
while True:
char = self.serial.read()
if PYTHON_V == 3:
char = char.decode('utf-8')
response += char
if char in '\r\n':
break
"""
if PYTHON_V == 3:
response = self.serial.readline().decode('utf-8').strip('\r\n')
else:
response = str(self.serial.readline().strip('\r\n'))
delta_time = self.__delta_time()
qt_time = pyqt_time()
response_bak = response
if response.upper().startswith('AT'):
response = '{} - [{}]'.format(delta_time, response)
elif response.upper().startswith('ERROR:'):
err = self.__explain_error_code(response[response.find('ERROR:') + len('ERROR:'):])
response = '{} | {} //{}'.format(delta_time, response, err)
else:
response = '{} | {}'.format(delta_time, response)
if self.wait_for_answer:
if 'OK' in response.upper():
self.event_may_send_command.set()
self.__print(response)
self.__dump(delta_time, response_bak)
except KeyboardInterrupt:
print('[-] Error: Interrupted by user. Exit.') # <- todo! this fucking shit
exit()
except serial.SerialException:
print('[-] Error: USB-device has been unfortunately unplugged. Exit.')
exit()
except Exception:
print('Unknown exception')
exit()
'''# ПОНЯТЬ, КАК ПРАВИЛЬНО ПОСЫЛАТЬ: ЕСЛИ СВЕТ ИЛИ ТЕМП., СНАЧАЛА АКТИВИРОВАТЬ ДАТЧИК, ПОТОМ ОПРАШИВАТЬ ЧЕРЕЗ ИНТЕРВАЛЫ.
if args.savelight and "SREAD" in response.upper() and ",22," in response:
ind = 7+response.strip().index(',22,')
### ОБРАБОТКА!!!
value = int(response.strip()[ind:ind+4],16)*120/30000
### /ОБРАБОТКА!!!
self.__print(str(value), data='light')
elif args.savetemp and "SREAD" in response.upper() and ",1F," in response:
ind = 7+response.strip().index(',1F,')
### ОБРАБОТКА!!!
value = int(response.strip()[ind:ind+4],16)*120/30000
### /ОБРАБОТКА!!!
self.__print(str(value), data='temp')
if response == '\r\n':
continue
elif response.upper().startswith('AT'):#'AT' in response.upper():
self.__print('[%s] >>> [%s]' % (self.__delta_time(), response.strip()))
else:
self.__print('[%s] %s' % (self.__delta_time(), response.strip("\r\n")))
'''
'''def process_commands(self, cmds):
if self.serial.isOpen():
try:
self.serial.flushInput()
self.serial.flushOutput()
if cmds_once:
for cmd_once in cmds_once:
self.__print('(.) [[{}]]'.format(cmd_once))
self.serial.write(cmd_once + '\r\n')
sleep(self.timeout_commands/1000)
def write_loop(ser, cmds, nrepeat):
for n in range(nrepeat):
for cmd in cmds:
try:
self.__print(str(n) + ' [['+cmd+']]')
self.serial.write(cmd + '\r\n')
sleep(self.timeout_commands/1000)
except KeyboardInterrupt:
return
write_loop(ser, cmds, nrepeat)
#cmd = "AT+ANNCE"
#cmd = "AT+SN:04"
#cmd = "ATI"
#self.serial.write(cmd + '\r\n')
#self.serial.write("ATI\r\n")
#self.serial.write("AT+TOKDUMP\r\n")
#self.serial.write("AT+panSCAN\r\n")
def main_loop():
main_loop()
except Exception as e:
self.__print('[-] Error while communicating..: {}'.format(str(e)))
else:
self.__print('[-] Error: cannot open serial port {}'.format(self.serialport))
'''
# ------------------------------------------------------------------------------------------------------------------
def send(self, cmd, device=None, ntimes=1):
# if device is None, than assumed that device name included into cmd.
if self.serial.isOpen():
for n in range(ntimes):
if self.wait_for_answer:
self.event_may_send_command.wait()
self.event_may_send_command.clear()
self.serial.flushInput()
self.serial.flushOutput()
print('{} >>> wrote: {}'.format(self.__delta_time(), cmd))
if PYTHON_V == 3:
self.serial.write(bytes(cmd + '\r\n', 'UTF-8'))
else:
self.serial.write(bytes(cmd + '\r\n'))
if self.timeout_commands > 0:
sleep(self.timeout_commands / 1000)
else:
self.__print(
'[-] Error while trying to send command "{}": serial port "{}" is closed'.format(cmd, self.serialport))
def __explain_error_code(self, errno):
if errno in self.error_codes:
return self.error_codes[errno]
else:
return ""
"""
class ReadingLoopThread(Thread):
def __init__(self, serial, print):
Thread.__init__(self)
self.kill_received = False
self.serial = serial
self.__print = print
def run(self):
while not self.kill_received:
try:
'''response, read_char = '', self.serial.read()
while read_char not in '\r\n':
response += read_char
read_char = self.serial.read()
'''
if PYTHON_V == 3:
response = self.serial.readline().decode('utf-8').strip('\r\n')
else:
response = str(self.serial.readline().strip('\r\n'))
if response.upper().startswith('AT'):
response = '[{}]'.format(response)
self.__print(response)
except KeyboardInterrupt:
self.kill_received = True
"""
# -*- coding: utf-8 -*-
# this code is very similar to one of the pyqtgraph' examples
import pyqtgraph as pg
from pyqtgraph.Qt import QtGui ,QtCore
#from PySide import QtCore
from PySide.QtCore import QObject, QThread, Signal, Slot
from PyQt4 import QtCore
import numpy as np
import sys
class HarryPlotter(QThread):
def __init__(self):
super(HarryPlotter, self).__init__()
self._win = pg.GraphicsWindow()
self._win.setWindowTitle('pyqtgraph example: Scrolling Plots') # TODo <--
#self.p = win.addPlot()
self._chunk_size = 100
self._max_chunks = 10
self._start_time = pg.ptime.time()
self._plot = self._win.addPlot() #colspan=2)
self._plot.setLabel('bottom', 'Time', 's')
self._plot.setXRange(-10, 0)
self._data = np.empty((self._chunk_size+1, 2))
#self._curve = self._plot.plot(self._data)
self._curves = []
self._ptr = 0
#timer = pg.QtCore.QTimer()
#timer.timeout.connect(self.update)
#timer.start(50)
self.connect(self, QtCore.SIGNAL("trigger"), self.update)
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()
else:
print('[-] Error: could not load Plotter class: sys.flags.interactive={}'.format(sys.flags.interactive))
def update(self, now, yval):
#self._win.nextRow()
#now = pg.ptime.time()
for c in self._curves:
c.setPos(-(now-self._start_time), 0)
i = self._ptr % self._chunk_size
if i == 0:
curve = self._plot.plot()
self._curves.append(curve)
last = self._data[-1]
self._data = np.empty((self._chunk_size+1, 2))
self._data[0] = last
while len(self._curves) > self._max_chunks:
c = self._curves.pop(0)
self._plot.removeItem(c)
else:
curve = self._curves[-1]
self._data[i+1,0] = now - self._start_time
self._data[i+1,1] = yval #np.random.normal()
curve.setData(x=self._data[:i+2, 0], y=self._data[:i+2, 1])
self._ptr += 1
'''def update(self, x, y):
now = x #pg.ptime.time()
for c in self.curves:
c.setPos(-(now-self.startTime), 0)
i = self.ptr % self.chunkSize
if i == 0:
curve = self.p.plot()
self.curves.append(curve)
last = self.data[-1]
data = np.empty((self.chunkSize+1,2))
data[0] = last
while len(self.curves) > self.maxChunks:
c = self.curves.pop(0)
self.p.removeItem(c)
else:
curve = self.curves[-1]
self.data[i+1,0] = now - self.startTime
self.data[i+1,1] = np.random.normal() #y
curve.setData(x=self.data[:i+2, 0], y=self.data[:i+2, 1])
self.ptr += 1
'''
def show(self):
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment