Skip to content

Instantly share code, notes, and snippets.

@vixtory09678
Created May 3, 2019 01:41
Show Gist options
  • Save vixtory09678/8b3bb049527e3cfcb99765061debeaa6 to your computer and use it in GitHub Desktop.
Save vixtory09678/8b3bb049527e3cfcb99765061debeaa6 to your computer and use it in GitHub Desktop.
import serial
import threading
import json
import struct
class Protocol(object):
def __init__(self, comport = 'COM13'):
self.ser = serial.Serial(comport, 115200)
self.ser.timeout = 1
# set thread
self.thread = threading.Thread(target=self.readSerial,args=(self.ser,))
self.thread.daemon = True
self.thread.start()
# set callback
self.callRaw = None
self.callSwitch = None
self.callStatus = None
self.callRelay = None
self.debug = False
def __del__(self):
if (self.thread.isAlive()):
self.thread.join()
if (self.ser.is_open):
self.ser.close()
def setDebugMode(self,debug):
self.debug = debug
def on(self,mode='read',call = None):
if (mode == 'raw'):
if (call):
self.callRaw = call
elif (mode == 'switch_box'):
if (call):
self.callSwitch = call
elif (mode == 'status'):
if (call):
self.callStatus = call
elif (mode == 'relay'):
if (call):
self.callRelay = call
else:
if (self.debug):
print("error event callback name")
def is_json(self, myjson):
try:
json_object = json.loads(myjson)
except ValueError:
return False, None
return True, json_object
def readSerial(self, ser):
while(1):
if (ser.in_waiting):
readSerial = ser.readline().decode('ascii')
complete, jsonObj = self.is_json(readSerial)
if (complete == False): continue
if(self.callRaw):
self.callRaw(readSerial)
if (jsonObj["cmd"] == 'status'):
if (self.callStatus):
self.callStatus(jsonObj["data"])
elif (jsonObj["cmd"] == 'relay'):
if (self.callRelay):
self.callRelay(jsonObj["data"])
elif (jsonObj["cmd"] == 'switch_box'):
if (self.callSwitch):
self.callSwitch(jsonObj["data"])
def send(self, data):
# convert into JSON:
y = json.dumps(data)
y = y + '\n'
# the result is a JSON string:
out = bytes(y,'utf-8')
if (self.debug):
print(y)
# send data to low level
self.ser.write(out)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment