Created
December 30, 2020 04:50
-
-
Save kevinkm/de53cd710abe2ff6dedca8b837a357cd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Created by .ignore support plugin (hsz.mobi) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Default ignored files | |
/shelf/ | |
/workspace.xml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<component name="ProjectDictionaryState"> | |
<dictionary name="KevinFox" /> | |
</component> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<component name="libraryTable"> | |
<library name="MicroPython" type="python"> | |
<CLASSES> | |
<root url="file://$APPLICATION_PLUGINS_DIR$/intellij-micropython/typehints/stdlib" /> | |
<root url="file://$APPLICATION_PLUGINS_DIR$/intellij-micropython/typehints/micropython" /> | |
<root url="file://$APPLICATION_PLUGINS_DIR$/intellij-micropython/typehints/esp8266" /> | |
</CLASSES> | |
<SOURCES /> | |
</library> | |
</component> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<component name="libraryTable"> | |
<library name="MicroPython" type="python"> | |
<CLASSES> | |
<root url="file://$APPLICATION_PLUGINS_DIR$/intellij-micropython/typehints/stdlib" /> | |
<root url="file://$APPLICATION_PLUGINS_DIR$/intellij-micropython/typehints/micropython" /> | |
<root url="file://$APPLICATION_PLUGINS_DIR$/intellij-micropython/typehints/esp8266" /> | |
</CLASSES> | |
<SOURCES /> | |
</library> | |
</component> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project version="4"> | |
<component name="ProjectInspectionProfilesVisibleTreeState"> | |
<entry key="Project Default"> | |
<profile-state> | |
<expanded-state> | |
<State> | |
<id /> | |
</State> | |
</expanded-state> | |
<selected-state> | |
<State> | |
<id>Buildout</id> | |
</State> | |
</selected-state> | |
</profile-state> | |
</entry> | |
</component> | |
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7" project-jdk-type="Python SDK" /> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project version="4"> | |
<component name="ProjectModuleManager"> | |
<modules> | |
<module fileurl="file://$PROJECT_DIR$/.idea/swordWithMC.iml" filepath="$PROJECT_DIR$/.idea/swordWithMC.iml" /> | |
</modules> | |
</component> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<module type="PYTHON_MODULE" version="4"> | |
<component name="FacetManager"> | |
<facet type="MicroPython" name="MicroPython"> | |
<configuration> | |
<device name="ESP8266" /> | |
</configuration> | |
</facet> | |
</component> | |
<component name="NewModuleRootManager"> | |
<content url="file://$MODULE_DIR$" /> | |
<orderEntry type="jdk" jdkName="Python 3.7" jdkType="Python SDK" /> | |
<orderEntry type="sourceFolder" forTests="false" /> | |
<orderEntry type="library" name="MicroPython" level="project" /> | |
</component> | |
<component name="TestRunnerService"> | |
<option name="PROJECT_TEST_RUNNER" value="Unittests" /> | |
</component> | |
</module> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project version="4"> | |
<component name="VcsDirectoryMappings"> | |
<mapping directory="$PROJECT_DIR$" vcs="Git" /> | |
</component> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# plugged gyro, press connect(it will reboot when connect by uPyCraft) ESP then unplug gyro in 1s after the led flashes ,now connected repl | |
# uart = uos.dupterm(None, 1) ## Turn off the UART0 (rx tx)on repl by defualt, in order to use it now. But repl will be invaild | |
from lib.COMMON.timer import Timer | |
Timer.init() | |
import network | |
# from timer import Timer | |
nic = network.WLAN(network.STA_IF) | |
if nic.active(): | |
nic.active(False) | |
ap = network.WLAN(network.AP_IF) # create access-point interface | |
if ap.active(): | |
ap.active(False) # activate the interface | |
del (nic,ap) | |
import gc | |
gc.collect() | |
import webrepl | |
webrepl.start() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
exec(open('./boot.py').read(),globals()) | |
# Writes down the codes according to user's tutorial. Thank you for purchasing our modules, have a good day! | |
from lib.COMMON.MMG_client import microMSG_client | |
from lib.CLI.gyroctrl_cli.GYROCTRL import GyroCtrl | |
from lib.COMMON.management import CAR as Main | |
# from lib.DRV.GYRO.MPU6050_JY61P import MPU6050_JY61P | |
conn = microMSG_client( apparatus=Main, h_CH=2, Hdatalen='car' ) | |
# mpu6050 = MPU6050_JY61P() | |
while True: | |
# print (mpu6050.Str_CarDATA ) | |
GyroCtrl.control( conn, mpu6050.Str_CarDATA ) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class GyroCtrl(): | |
def control(conn, data): | |
conn._send_H( data ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from lib.COMMON.timer import Timer | |
class Switch( object ): # using timer | |
def __init__(self, conn): | |
self.conn = conn | |
self.CMDing = 0 | |
self.RunAfter = Timer.RunAfter | |
# a = lambda x,y: int( x.split( '-' )[y]) | |
# self.setgpio = lambda b,bool: conn.slotToGPIO[a(b,0)].value( a(b,1) if bool == 1 else a(b,1) ) | |
# self.setgpio = lambda b,bool: conn.slotToGPIO[a(b,0)].value( bool ) | |
# self.setgpio = lambda b: conn.slotToGPIO[a(b,0)].value( a(b,1) ) | |
self.setgpio = lambda b,bool: conn.slotToGPIO[int(b)].value( bool ) | |
def setstatus(self,i): | |
for j in self.slots[i]: # Change the status of the list of Slots. | |
self.setgpio(j, self.status[i]) | |
# print('SET STATUS IS ', self.status[i]) | |
def MSG(self, data): | |
if self.CMDing: # while in process of CMDing, can receive new data, but can't deal with it | |
for i in range( len( self.slots ) ): | |
if self.rounds[i]>0: | |
if self.RunAfter ( self.interval[i] ): | |
print('CHANGE status of switch') | |
# print ('running in CMDing') | |
self.status[i] = not self.status[i] | |
self.setstatus(i) | |
if self.status[i]==0: # when status is OFF, rounds - 1 ?? | |
self.rounds[i] -= 1 | |
if self.rounds[i] == 0: | |
self.setstatus(i) | |
self.countTimes -= 1 | |
# print( '\ncountTimes -1 \n' ) | |
if self.countTimes==0: | |
self.CMDing = 0 | |
print( '\nDone! You can receive data\n' ) | |
# print (self.rounds[i]) | |
# print('\n\n\nswitch to', self.S,'\n\n\n') | |
elif data!=None: | |
self.slots, self.interval, self.rounds, self.status, b2, = [], [], [], [], [] | |
# print ('data:',data) | |
a = data.split(',') | |
for i in range(len(a)): | |
aa = a[i].split(':')[0] | |
if '_' in aa: | |
for j in aa.split('_'): | |
b2.append(j) | |
self.setgpio(j,True) | |
self.slots.append( b2 ) | |
b2=[] | |
else: | |
self.setgpio( aa, True ) # set GPIO high | |
# print ('set GPIO high') | |
self.slots.append( aa ) | |
self.interval.append('MSG'+str(i)+'_' + str(int(a[i].split(':')[1].split( '-' )[0]) * 1000)) | |
self.rounds.append(int(a[i].split(':')[1].split('-')[1])) | |
self.status.append(True) # each of status of switch is ON | |
self.countTimes = len( self.slots ) | |
self.CMDing = 1 | |
self.conn.switchMSG = None # useless? | |
# print('slots:', self.slots, ' times:', self.rounds, ' interval:', self.interval, ' status:', self.status ) | |
def recvRepeatMSG_loop(self): | |
if self.CMDing or self.conn.switch_loop() : | |
# print ('LOOPing',self.conn.switchMSG,' self.runing', self.running) | |
self.MSG(self.conn.switchMSG) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Arduino_func(): | |
__slots__ = ('leftMin', 'leftMax', 'rightMin','rightMax') | |
def __init__(self, leftMin, leftMax, rightMin, rightMax ): | |
self.leftMin = leftMin | |
self.leftMax = leftMax | |
self.rightMin = rightMin | |
self.rightMax = rightMax | |
def map(self, value, order=1): | |
if value < self.leftMin or value > self.leftMax: | |
return None | |
# Figure out how 'wide' each range is | |
leftSpan = self.leftMax - self.leftMin | |
rightSpan = self.rightMax - self.rightMin | |
# Convert the left range into a 0-1 range (float) | |
valueScaled = float( value - self.leftMin ) / float( leftSpan ) | |
# Convert the 0-1 range into a value in the right range. | |
if order == 1: | |
return int( self.rightMin + (valueScaled * rightSpan) ) | |
elif order == -1: | |
return int( (self.rightMin + self.rightMax) - (self.rightMin + (valueScaled * rightSpan)) ) | |
''' | |
#Usage: | |
arduino_map=Arduino_map(0,180,30,120) | |
arduino_map(150) | |
#it will return a number between 30 to 120, which due to the value that scaled ascending order by 0 to 180, | |
#if "-1" as one of parameters | |
arduino_map.map(150,-1) | |
# it will be scaled descending order by 180 to 0 | |
''' | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from machine import Pin | |
from lib.COMMON.timer import Timer | |
class Force(): # using Timer | |
def __init__(self, IO, funclist, funcP=None, diffi='normal'): | |
# from lib.management import MCBOX_IOassign | |
# self.IO={'MC':Pin( IO.MC_IN, Pin.IN ), 'VIBRATE':Pin( MCBOX_IOassign.VIBRATE, Pin.OUT ), 'MCPOWER':Pin( MCBOX_IOassign.MCPOWER, Pin.OUT )} | |
self.IO={'MC' : Pin( IO.MC_IN, Pin.IN,Pin.PULL_DOWN )} | |
# self.IO['MCPOWER'].value( 1 ) | |
# self.IO['VIBRATE'].value( 0 ) | |
self.MCtimes, self.round = 0, 0 | |
self.failedRound = 35 | |
self.funclist=funclist | |
self.funcP=funcP | |
self.target_i = len(funclist) | |
self.RunAfter = Timer.RunAfter | |
self.gotForce = 0 | |
if diffi == 'easy': | |
self.MCthreshold = 3 | |
elif diffi == 'normal': | |
self.MCthreshold = 5 | |
elif diffi == 'hard': | |
self.MCthreshold = 9 | |
def reset_parameters(self): | |
self.MCtimes = 0 | |
self.round = 0 | |
print ('a round, reset parameters ') | |
@property | |
def forceON(self): | |
# self.conn.Main() | |
if self.gotForce==0: # when inputDone=1, force_both stop working, but sword_srv start to work, details to see sword_main.py | |
if self.RunAfter( 'countMC_1000' ): | |
# if self.IO['MC'].value() == 1 or 1: # for test | |
if self.IO['MC'].value(): # run officially | |
self.MCtimes += 1 | |
self.round += 1 | |
if (self.round >= 10): | |
if self.MCtimes >= self.MCthreshold: | |
# self.runTrigger() | |
if self.MCthreshold == 3: # 3 means easy | |
# run all ID once | |
for i in range(len(self.funclist)): | |
self.funclist[i]( self.funcP ) | |
print ('bodyID and targetID') | |
elif self.MCthreshold > 3: # greater than 3 means normal or hard | |
if self.target_i == 1: | |
print( 'Run Final func' ) | |
self.funclist[0]( self.funcP ) | |
self.target_i = len (self.funclist) # reset target_i | |
self.gotForce = 1 #when inputDone=1, MC stop working, but sword_srv start to work, details to see sword_main.py | |
elif self.target_i > 1: | |
self.funclist[self.target_i-1]( self.funcP ) | |
print('The running func is no.: ', self.target_i ) | |
self.target_i -= 1 | |
else: | |
print ('fail to activate') | |
self.reset_parameters() | |
else: | |
return 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# class sort by name | |
class Sword_AP(): | |
ssid = 'Sword_AP' | |
passwd = '123456789' | |
IPprefix = '192.168.4.' | |
class IO(): | |
MC_IN=22 | |
RELAY_OUT=27 | |
class CMD(): | |
Lhand='1:1-1' | |
Rhand1='1_2:1-1' | |
Rhand2='1_2:4-1' # S1,S2, begin from ON at once, next running is opposite of beginning, after interval of round = 4s, start by round-cycle, 1 round in all | |
target='1:10-1' | |
class SWORD_Lhand(): # touch electrical light, it was foot before | |
ssid = Sword_AP.ssid | |
passwd = Sword_AP.passwd | |
IPprefix = Sword_AP.IPprefix | |
ID = 17 | |
S_Num = 1 | |
S1 = 4 #d2, GPIO4 | |
# cmd = '1:1-1' # S1, begin from ON immediately, next running is opposite of beginning, after interval of round= 0.5s,, start by round-cycle, 1 round in all. | |
class SWORD_Rhand(): # hold the swordWithMC | |
ssid = Sword_AP.ssid | |
passwd = Sword_AP.passwd | |
IPprefix = Sword_AP.IPprefix | |
ID = 18 | |
S_Num = 2 | |
S1 = 14 #d5, GPIO14 | |
S2 = 12 #d6, GPIO12 | |
# cmd_2 = '1_2:4-1' # S1,S2, begin from ON at once, next running is opposite of beginning, after interval of round = 4s, start by round-cycle, 1 round in all | |
# cmd_2 = '1:8-5,2:2-9' # S1,S2, begin from ON at once, next running is opposite of beginning, after interval of round = 4s, start by round-cycle, 1 round in all | |
class SWORD_target(): | |
ssid = Sword_AP.ssid | |
passwd = Sword_AP.passwd | |
IPprefix = Sword_AP.IPprefix | |
ID = 19 | |
S_Num = 1 | |
S1 = 0 # d3, GPIO0 | |
# cmd = '1:10-1' # S1, begin from ON immediately, next running is opposite of beginning, after interval of round= 10s,, start by round-cycle, 1 round in all. | |
class CAR(): | |
ssid = 'CAR_AP' | |
passwd = '123456789' | |
IPprefix = '192.168.4.' | |
ID = 13 | |
D_sensor = False | |
class Data_AP(): | |
ssid = 'Data_AP' | |
passwd = '123456789' | |
IPprefix = '192.168.4.' | |
class highSpeed(): | |
ssid = 'Data_AP' | |
passwd = '123456789' | |
ID = 2 | |
IPprefix = '192.168.4.' | |
class branch1(): | |
ID = 3 | |
class lowSpeed(): | |
ssid = 'Data_AP' | |
passwd = '123456789' | |
ID = 4 | |
IPprefix = '192.168.4.' | |
class branch1(): | |
ID = 5 | |
class branch2(): | |
ID = 6 | |
class powerStrip_AP(): # CAN'T BE HAVE BRANCH | |
ssid = 'powerStrip_AP' | |
passwd = '123456789' | |
IPprefix = '192.168.4.' | |
class CMD(): | |
No1 = '2:3-1' | |
No2 = '2:3-1' | |
class powerStrip_1(): | |
ssid = powerStrip_AP.ssid | |
passwd = powerStrip_AP.passwd | |
IPprefix = powerStrip_AP.IPprefix | |
ID = 8 | |
S_Num = 6 | |
S1 = 0 # GPIO 0 | |
S2 = 2 | |
S3 = 4 | |
S4 = 5 | |
S5 = 12 | |
S6 = 13 | |
class powerStrip_2(): | |
ssid = powerStrip_AP.ssid | |
passwd = powerStrip_AP.passwd | |
IPprefix = powerStrip_AP.IPprefix | |
ID = 9 | |
S_Num = 6 | |
S1 = 0 # GPIO 0 | |
S2 = 2 | |
S3 = 4 | |
S4 = 5 | |
S5 = 12 | |
S6 = 13 | |
class MCBOX_IOassign(): # not using yet | |
MC_IN=13 | |
VIBRATE=4 | |
MCPOWER=5 | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
This is microMG, MG = message, go! | |
''' | |
import socket | |
import network | |
from machine import reset as machinereset | |
from lib.COMMON.timer import Timer | |
sta = network.WLAN( network.STA_IF ) | |
sta.active( 1 ) | |
class microMSG_client(): | |
# __slots__ = ['ID', 'switchlen', 'shakeData_port', 'H_port', 'regularData_port', 'H_failCount', 'LData_len','HdataLen_str','Hcheck','shakeData_len','switchOffCount','isHighSpeed','switchConn',''] | |
def __init__(self, apparatus, | |
h_CH='', Hdatalen=10, Ldatalen=10, Ltime=1000, isSendRepeat=1, switchMode=0): #h_CH=1...5 | |
# program start | |
self.ID = 'ID' + '%04d' % apparatus.ID | |
if Hdatalen == 'car': | |
Hdatalen = 7 | |
if switchMode==1: | |
from machine import Pin | |
import time | |
self.ran_switch = time.ticks_us() % 1000 | |
self.slotToGPIO={} | |
for i in range( 1, apparatus.S_Num + 1 ): | |
self.slotToGPIO.update( {i: Pin( apparatus.__dict__.get( 'S' + str( i ) ), Pin.OUT )} ) | |
self.switchlen = 31 | |
self.shakeData_port = 700 | |
self.H_port = [800, 801, 802, 803, 804] | |
# self.competition_port = 900 | |
self.regularData_port = 1000 | |
self.H_failCount = 0 | |
self.LData_len = Ldatalen | |
self.HdataLen_str = '%04d' % Hdatalen | |
self.Hcheck = ''.join( ['*' for i in range( Hdatalen )] ) | |
self.shakeData_len = 6 | |
self.switchOffCount = 0 | |
self.isHighSpeed = 1 | |
# self.tempcount = 1 | |
self.switchConn = None | |
import time | |
self.ran_CL = (time.ticks_us() % 100) * 2 | |
# del (time) | |
self.CLtime_ms = Ltime if Ltime >= 1000 else 1000 | |
self.isHalfduplex_recv = 1 | |
self.Latest_Hdata=None | |
# self.switchData,self.switchStatus= '','ON' | |
# Timer.init() | |
self.RunAfter = Timer.RunAfter | |
self.isSendRepeat=isSendRepeat | |
# connect to WIFI , then socket | |
if sta.isconnected() == 0: | |
sta.ifconfig( (apparatus.IPprefix + str( apparatus.ID ), '255.255.255.0', apparatus.IPprefix + '1', apparatus.IPprefix + '1') ) | |
self.connectwifi( sta, apparatus.ssid, apparatus.passwd ) | |
else: | |
print( 'WIFI has already CONNECTED' ) | |
# CREATE SHAKEDATA | |
if h_CH != '': | |
self.highSpeed_port = self.H_port[h_CH] | |
self.shakeData = '%04d' % apparatus.ID + 'H' + str( h_CH ) | |
elif switchMode == 1: | |
print( 'switch_cli MODE' ) | |
globals()['i' + '%04d' % apparatus.ID + 'runtimes'] = 0 | |
self.shakeData = '%04d' % apparatus.ID + 'S_' | |
self.isSwitch = 1 | |
else: | |
self.shakeData = '%04d' % apparatus.ID + 'L_' | |
# IF HIGH SPEED | |
if h_CH != '': | |
# SEND SHAKE DATA | |
self.sendUDP( port=self.shakeData_port, data=self.shakeData ) | |
print( 'sent self.shakeData : ' + self.shakeData ) | |
# SEND THE LENGTH OF HIGHSPEED, WAIT UNTIL RECEIVE STR: 'OK' | |
conn, recvData = self.verifyTCP( self.HdataLen_str, 8000 ) | |
if recvData == 'OK': | |
print( 'trying connect to high speed port: ' + str( self.highSpeed_port ) ) | |
self.H_clientSocket = self.try_connConnnect( int( self.highSpeed_port ) ) | |
if self.H_clientSocket != 1: | |
self.isHighSpeed = 1 | |
self.H_clientSocket.settimeout( 0.1 ) | |
else: | |
print( '\n\nH_clientSocket is 1\n\n' ) | |
conn.close() | |
else: | |
print( ' can not receive the response from high speed port 8000 ' ) | |
Timer.delayMS(5000) | |
def connectwifi(self, station, ssid, password): | |
counter = 0 | |
station.disconnect() | |
while station.isconnected() == 0: | |
Timer.delayMS(1000) | |
print(' is trying to connect WIFI') | |
station.connect(ssid, password) | |
Timer.delayMS(5000) | |
counter += 1 | |
if counter >= 8 or station.isconnected() == 0: | |
print('can not find the AP , go to sleep now!') | |
def sendUDP(self, port, data): | |
count = 0 | |
while 1: | |
try: | |
conn = socket.socket() | |
conn.connect( ('192.168.4.1', port) ) | |
self._send_data( conn, data ) | |
conn.close() | |
break | |
except: | |
count += 1 | |
Timer.delayMS(800) | |
print( 'shake data can not send.' ) | |
if count > 4: | |
# uos.dupterm(uart, 1) # for moment when test | |
machinereset() # for moment when officially start to use | |
def DoneSwitchconn(self): | |
try: | |
self.switchConn.close() | |
self.switchConn=None | |
print ('switchConn close succeed') | |
except: | |
print ('switchConn close failed') | |
pass | |
self.switchOffCount=0 | |
self.isHalfduplex_recv=1 | |
def verifyTCP(self, data, port, mode=''): # halfduplex means: send data in this round, receive data in next round | |
if self.isHalfduplex_recv == 1: # "isHalfduplex_recv" only for SWITCH MODE | |
conn = self.try_connConnnect( int( port ) ) | |
if conn != 1: | |
self._send_data( conn, data ) | |
else: | |
print ('get conn of verifyTCP failed') | |
self.DoneSwitchconn() | |
return 1, 1 | |
else: | |
conn = self.switchConn | |
if conn != 1: | |
if mode == '': | |
conn.settimeout( 5 ) | |
elif mode == 'halfduplex' and self.isHalfduplex_recv == 1: | |
conn.settimeout( 0 ) | |
self.isHalfduplex_recv = not self.isHalfduplex_recv | |
return conn, 'responding' | |
elif mode == 'halfduplex': | |
self.isHalfduplex_recv = not self.isHalfduplex_recv | |
recv = self._recv_data( conn, self.shakeData_len + self.switchlen ) # receive: NO, OFF, OK, CHECK, | |
return conn, recv | |
def try_connConnnect(self, port): | |
count = 0 | |
while count < 5: | |
try: | |
conn1 = socket.socket() | |
conn1.connect( ('192.168.4.1', port) ) | |
return conn1 | |
except: | |
count += 1 | |
Timer.delayMS(750) | |
print( 'can not connect to server port: ' + str( port ) ) | |
return 1 | |
def _recv_data(self, conn, length): | |
try: | |
recv = conn.recv( length ).decode() | |
if recv == '':# for test, able to be deleted. | |
print( 'receive empty, try again' ) # for test, able to be deleted. | |
except: | |
recv = '_' | |
print ('NO data can be received, so it will be _') | |
return recv | |
def _send_data(self, conn, str): | |
conn.send( str.encode() ) | |
def switch_loop(self): | |
recvData='' | |
if self.isSwitch == 1: | |
if self.RunAfter( self.ID + '_' + str( 1000 + self.ran_switch ) ): | |
while recvData=='': | |
if self.RunAfter('tryRecv_150'): | |
if self.switchOffCount > 7: # if can't recv data over 7times, send 'check' to server to prove not offline. | |
self.switchConn, recvData = self.verifyTCP( self.shakeData + 'check', self.regularData_port, | |
mode='halfduplex' ) | |
else: | |
self.switchConn, recvData = self.verifyTCP( self.shakeData , | |
self.regularData_port, mode='halfduplex' ) | |
print( 'Non-Responsed-Times is : ' + str( self.switchOffCount ) ) | |
if recvData not in ('responding', '_', 1): | |
self.switchConn.close() | |
rawdata = recvData[self.shakeData_len:] # rawdata in switch_cli is 0 or 1 | |
print ('\nGET SWITCH DATA: ', rawdata,'\n') | |
unique_ID = recvData[:4] | |
if unique_ID == self.ID[2:]: | |
self.switchOffCount = 0 | |
if rawdata == 'check': | |
self.switchOffCount = 0 | |
elif ':' in rawdata: | |
self.switchMSG = rawdata | |
return 1 # funclib needs this | |
else: # when data you receive isn't : ON, OFF , check | |
b = rawdata.split( ',' ) | |
for i in range( len( b ) ): | |
self.slotToGPIO[int( b[i].split( '-' )[0] )].value( int( b[i].split( '-' )[1] ) ) | |
else: | |
self.switchOffCount += 1 | |
if self.switchOffCount > 24: | |
print ('recvData: ',recvData) | |
print( ' set switch_cli be offline' ) | |
def _send_L(self, data): | |
if self.RunAfter( self.ID + '_' + str( self.CLtime_ms + self.ran_CL ) ): | |
self.sendUDP( data=self.shakeData + data, port=self.regularData_port ) | |
print ('send L') | |
def _send_H(self, data): | |
if self.isHighSpeed and self.RunAfter( self.ID + '_50' ): | |
try: | |
if self.isSendRepeat or data != self.Latest_Hdata: | |
self.Latest_Hdata = data | |
self.H_clientSocket.send( self.Latest_Hdata.encode() ) | |
self.H_failCount = 0 | |
elif self.RunAfter( self.ID + 'HC_14000' ): | |
self.H_clientSocket.send( self.Hcheck ) | |
except: | |
Timer.delayMS(700) | |
self.H_failCount += 1 | |
print( ' Highspeed data can not send.' ) | |
if self.H_failCount > 8: | |
self.isHighSpeed=1 | |
print ('isHighSpeed 1') | |
return 1 # for moment when test | |
# machine.reset() # for moment when officially start to use | |
def Main(self): | |
self.switch_loop() | |
''' | |
def competition_client(self, checkin): | |
self.client_socket.settimeout( -1 ) | |
data = self._recv_data( self.client_socket, self.online_len ) | |
while data == 'B': | |
# not yet, count down, and make a beep for 5s | |
print( 'warning, MC check start after 5s' ) | |
Timer.delayMS(5000) | |
while 1: | |
Timer.delayMS( 1000 ) # save energy than run_after | |
if checkin: | |
self._send_data( self.client_socket, '1' ) | |
self.client_socket.settimeout( 0.2 ) | |
if self._recv_data( self.client_socket, self.online_len ) == 'W': | |
print( 'winner' ) | |
self.client_socket.settimeout( 0 ) | |
data = '_' # jump out the whole loop. | |
else: | |
print( 'you out, waiting for next chance' ) | |
self.client_socket.settimeout( -1 ) | |
if self._recv_data( self.client_socket, self.online_len ) == 'B': | |
break | |
''' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import time | |
import network | |
from lib.COMMON.timer import Timer | |
class microMSG_server(): | |
def __init__(self, WIFI, test=0): | |
ap = network.WLAN(network.AP_IF) # create access-point interface | |
ap.active(1) # activate the interface | |
ap.config(essid=WIFI.ssid, password=WIFI.passwd,authmode=4,channel=9) # set the ESSID of the access point | |
time.sleep_ms(3000) | |
self.printTest=test # only for test, control by main.py | |
self.shake_port = 700 | |
# self.competition_port = 900 | |
self.regularData_port = 1000 | |
# self.competition_users_num = 0 | |
self.shake_socket = socket.socket() # get instance | |
self.shake_socket.bind(('0.0.0.0', self.shake_port)) # bind host address and port together | |
self.shake_socket.listen(5) | |
self.shake_socket.settimeout(0) | |
self.N_socket = socket.socket() # get instance | |
self.N_socket.bind(('0.0.0.0', self.regularData_port)) # bind host address and port together | |
self.N_socket.listen(5) | |
self.N_socket.settimeout(0) | |
self.data_len = 10 | |
self.Hdata_str = None | |
self.Ldata_str = None | |
self.shakeData_len = 6 | |
# self.compRecv_len = 1 | |
self.recv_List = [] | |
self.isHighSpeed = [0, 0, 0, 0, 0] | |
self.H_conn = ['H_conn0', 'H_conn1', 'H_conn2', 'H_conn3', 'H_conn4'] | |
self.H_checkOnline = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0} | |
self.H_socket = {'H_socket0': 800, 'H_socket1': 801, 'H_socket2': 802, 'H_socket3': 803, 'H_socket4': 804} | |
self.isGet_H = 0 | |
self.shakeToLData_dict = {} # i don't remember, but it may be only for competition. | |
self.switch_dict = {} | |
self.isSendSwitch = {} | |
# Timer.init() # run in boot.py | |
self.RunAfter = Timer.RunAfter | |
def updateSwitch(self, ID, cmd): | |
self.switch_dict['%04d' % ID] = [cmd, True] | |
print ('%04d' % ID,'update switchDict: ',self.switch_dict['%04d' % ID]) | |
# print ('update switchDict: ',self.switch_dict) | |
def try_connAccept(self, conn1): | |
try: | |
conn, addr = conn1.accept() | |
return conn | |
except: | |
pass | |
return 0 | |
def wait_newconn(self): | |
if self.RunAfter('newConn_1100'): | |
try: | |
# print ('check newConn in every 1.1s') | |
conn1, addr = self.shake_socket.accept() # just use addr here, addr is ip, it's same like online_socket. | |
conn1.settimeout( | |
2.8) # timeout for 3s to receive the shakeData, do not use -1, incase signal just block for awhile | |
# print ('checked newConn ') | |
try: | |
shakeData = self._recv_data(conn1, self.shakeData_len) | |
except: | |
print( 'no shakeData, it may a switch_cli' ) | |
pass | |
conn1.settimeout(0) | |
conn1.close() | |
print( 'GOT SHAKE_DATA: ' + shakeData ) | |
if shakeData[0] != '_': | |
# Add into list | |
self.shakeToLData_dict[shakeData] = '' | |
# USAGE: constantly use client, it needs to start after the previous one has been settled down. | |
if shakeData[-2:-1] == 'H': | |
i = int(shakeData[-1:]) # i is the number of channel | |
# print( 'HighSpeed' ) | |
# STEP ONE, create socket in port( 8000) to receive length of H_data | |
socketLength = socket.socket() | |
# print( 111111111 ) | |
socketLength.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
socketLength.bind(('0.0.0.0', 8000)) | |
# print( 222222222 ) | |
socketLength.listen(5) | |
socketLength.settimeout(10) | |
connLength = self.try_connAccept(socketLength) | |
if connLength != 0: | |
connLength.settimeout(5) # keep 5s, very important before create high speed connection. | |
recv = self._recv_data(connLength, 4) | |
# print( 3333333333 ) | |
print( 'recv from connLength: ' + recv ) | |
# send length back | |
if recv != '_': | |
self.Hdata_len = int(recv) | |
# self.Hcheck=['*' for i in range( self.Hdata_len )] # no need. | |
print( 'receive length: ' + recv + ' NOW send OK back' ) | |
self._send_data(connLength, 'OK') | |
print( 'CLOSE connLength AND socketLength ' ) | |
else: | |
print( 'can not recv length H_ch' ) | |
pass | |
else: | |
print( 'NO request port 8000, done.' ) | |
pass | |
# STEP TWO, create socket in port(800...804) to receive H_data as such | |
if connLength != 0: | |
H_socket = list(sorted(self.H_socket.keys()))[i] # get data from position 0 | |
globals()[H_socket] = socket.socket() # this line must after try. | |
globals()[H_socket].setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
globals()[H_socket].bind( | |
('0.0.0.0', self.H_socket[H_socket])) # bind host address and port together | |
globals()[H_socket].listen(5) | |
globals()[H_socket].settimeout(5) | |
print( 'bound port: ' + str( self.H_socket[H_socket] ) ) | |
globals()[self.H_conn[i]] = self.try_connAccept(globals()[H_socket]) | |
print (globals()[self.H_conn[i]]) | |
if globals()[self.H_conn[i]] != 0: | |
globals()[self.H_conn[i]].settimeout(0.05) | |
self.isGet_H = 1 | |
self.isHighSpeed[i] = 1 | |
print( 'start to receive high speed data ' ) | |
connLength.close() | |
socketLength.close() | |
else: | |
print( 'NO connections-request on port ' + str( i ) ) | |
pass | |
# self.competition_users_num += 1 | |
except: | |
pass | |
''' | |
def competition_server(self, Setnum_Users): | |
# competition_conn = List_NormalCHANNEL_conn | |
competition_conn = list(self.shakeToLData_dict.keys()) | |
def sendTOALLcompetition(str): | |
for i in competition_conn: | |
self._send_data(globals()[competition_conn[i]], str) | |
def recvTOALLcompetition(): | |
for i in competition_conn: | |
data = self._recv_data(globals()[competition_conn[i]], self.compRecv_len) | |
if data[0] == '_': | |
self.competition_users_num -= 1 | |
competition_conn.pop(i) | |
def begin_competition(): | |
if self.RunAfter('competition_1000', 4): | |
sendTOALLcompetition('B') | |
if self.competition_users_num >= Setnum_Users: | |
begin_competition() | |
# confrontation loop start from here | |
while 1: | |
# begin at first time | |
if self.competition_users_num > 1: | |
if self.RunAfter('competition_1000'): | |
recvTOALLcompetition() | |
# Winner is born | |
elif self.competition_users_num == 1: | |
globals()[competition_conn[0]].send(b"W") | |
self._recv_data(globals()[competition_conn[Setnum_Users]], | |
self.compRecv_len) # important clear buffer | |
return globals()[competition_conn[0]] | |
# reset parameters for startover | |
elif self.competition_users_num == 0: | |
begin_competition() | |
''' | |
def _send_data(self, conn, str): | |
conn.send( str.encode() ) | |
def _recv_data(self, conn, length): | |
self.connAccept = conn | |
try: | |
if length == self.shakeData_len or length == 4: | |
recv = self.connAccept.recv(length).decode() | |
else: | |
self.connAccept = conn.accept()[0] | |
recv = '_' | |
while 1: | |
try: | |
recv = self.connAccept.recv(length).decode() | |
time.sleep(0.01) | |
# self.recv_List.append(recv) | |
shakeData = recv[:self.shakeData_len] | |
self.shakeToLData_dict[shakeData] = recv[self.shakeData_len:] | |
# globals()['online' + shakeData] = 1 | |
break | |
except: | |
pass | |
except: | |
# List_CheckOnlineCHANNEL_conn.append( Var_Name_str ) | |
recv = '_' | |
return recv | |
def _get_L(self): | |
recv = self._recv_data(self.N_socket, self.shakeData_len + self.data_len) # recv has included accept() | |
if recv != '_': | |
shakedata = recv[:self.shakeData_len] | |
uniqueid = shakedata[:4] | |
rawdata = recv[self.shakeData_len:] | |
if shakedata[-2:] == 'S_': # 'receive a swicth request' | |
print( 'receive a switch_cli request, id is: ', uniqueid ) | |
# problem: | |
# 1. if isSendSwitch=Bool, it turns False since updateed, but other switch may need to update | |
# 2. if isSendSwitch is num, it will add up when | |
# if self.isSendSwitch > 0 : | |
try: | |
a=self.switch_dict[uniqueid][1] | |
except: | |
print('No this ID ' + uniqueid + ' in switch_dict') | |
a=0 | |
if a : | |
self._send_data(self.connAccept, shakedata + self.switch_dict[uniqueid][0]) | |
self.switch_dict[uniqueid][1] = False | |
print('\nsending updated data to client for switch_cli:',shakedata + self.switch_dict[uniqueid][0],'\n\n') | |
elif rawdata == 'check': | |
self._send_data(self.connAccept, shakedata + 'check') | |
print( 'check client for switch_cli' ) | |
# else: # this is for test | |
# print( 'no need to response for switch_cli request! ' ) | |
# pass | |
self.connAccept.close() | |
elif shakedata[-2:] == 'L_': | |
self.Ldata_str = recv[self.shakeData_len:] | |
if self.printTest == 1: | |
print( 'Ldata_str: ' + recv ) | |
self.connAccept.close() | |
def _get_H(self, i, conn): | |
try: | |
recv = conn.recv(self.Hdata_len).decode() # don't use _recv_data() | |
except: | |
recv = '_' | |
if recv == '_': | |
self.Hdata_str = None | |
self.H_checkOnline[i] += 1 | |
# accumulation greater than 200 | |
if self.H_checkOnline[i] > 100: # 100 = about 10s | |
self.isHighSpeed[i] = 0 | |
self.H_checkOnline[i] = 0 | |
conn.close() # close accept | |
globals()[list(sorted(self.H_socket.keys()))[i]].close() # close socket | |
print( 'high speed CHANNEL-' + str( i ) + ' has got lost: ' ) | |
print( list( sorted( self.H_socket.keys() ) )[i] + ' closed' ) | |
del (globals()[self.H_conn[i]]) | |
print( self.H_conn[i] + ' closed' ) | |
del (globals()[list(sorted(self.H_socket.keys()))[i]]) | |
self.Hdata_str = 'offline' | |
else: | |
self.H_checkOnline[i] = 0 | |
self.Hdata_str = recv if '*' not in recv else None | |
if self.printTest==1: | |
print( 'Hdata_str: ' + self.Hdata_str ) | |
def start_server(self): | |
if self.isGet_H and self.RunAfter('HS_50'): | |
for i in range(5): | |
if self.isHighSpeed[i]: | |
self._get_H(i, globals()[self.H_conn[i]]) | |
if self.RunAfter('lowSpeed_350'): | |
self._get_L() | |
def Main(self): # put it into "while" statement | |
self.wait_newconn() | |
self.start_server() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
class Timer(): | |
__slots__ = ['RunAfter', 'ticksAdd', 'ticks_ms_3', 'delayMS', 'ticks_ms', 'ticks_add', 'delay_ms'] | |
def delayMS(t): | |
time.sleep( (t / 1000) ) | |
def ticks_ms_3(): | |
return int( time.time() * 1000 ) | |
def ticksAdd(a, b): | |
return a + b | |
def init(): | |
# global ticks_ms, ticks_add, delay_ms | |
print ('\nInit timer') | |
import sys | |
if 'esp' in sys.platform: | |
print( '\nESP SYSTEM\n' ) | |
Timer.ticks_ms = time.ticks_ms | |
# Timer.ticks_add = time.ticks_add | |
Timer.ticks_add = Timer.ticksAdd | |
Timer.delay_ms = time.sleep_ms | |
else: | |
print( 'NOT ESP SYSTEM' ) | |
Timer.ticks_ms = Timer.ticks_ms_3 | |
Timer.ticks_add = Timer.ticksAdd | |
Timer.delay_ms = Timer.delayMS | |
del sys | |
def RunAfter(str1): | |
if str1 not in globals().keys(): | |
globals()[str1] = Timer.ticks_add( Timer.ticks_ms(), int( str1.split( '_' )[1] ) ) | |
elif globals()[str1] - Timer.ticks_ms() < 0: | |
del (globals()[str1]) | |
return True | |
else: | |
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#-*-coding:utf-8-*- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#-*-coding:utf-8-*- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from machine import UART | |
from lib.COMMON.Arduino import Arduino_func | |
from lib.COMMON.timer import Timer | |
# import uos, time | |
class MPU6050_JY61P(): | |
def __init__(self, type=('angle')): | |
''' | |
EPS32 MODULE | |
uart=UART( 1, baudrate=115200, bits=8, parity=None, stop=1, tx=12, rx=14, rts=-1, cts=-1, txbuf=256, rxbuf=256, | |
timeout=5000, timeout_char=2 ) | |
self.uart.init( 115200, bits=8, parity=None, stop=1, pins=(17, 16) ) | |
# but the 1 may be used as pin9 and pin10, try uart2 first | |
uart = UART( 2, baudrate=115200) # for esp32, it is said that uart2 default tx=17,rx=16 | |
self.uart.init( 115200, bits=8, parity=None, stop=1 ) | |
''' | |
import sys | |
s=sys.platform | |
if s=='esp8266': | |
p=0 | |
elif s=='esp32': #rx16,tx17 | |
p=2 | |
self.getAngle,self.getPalstance,self.getAccel,=0,0,0 | |
if 'angle' in type: | |
self.getAngle=1 | |
if 'palstance' in type: | |
self.getPalstance=1 | |
if 'accel' in type: | |
self.getAccel=1 | |
self.uart = UART( p, baudrate=115200 ) | |
self.uart.init( 115200, bits=8, parity=None, stop=1 ) | |
self.Re_buf = [] | |
self.angle, self.accel, self.palstance= [], [], [] | |
self.sum_rx_bytes, self.rx_bytes = b'', b'' | |
######################### | |
self.startLroll = 0 | |
self.endLroll = 60 | |
self.startRroll = 359 | |
self.endRroll = 300 | |
self.startFrotate = 359 | |
self.endFrotate = 300 | |
self.startBrotate = 0 | |
self.endBrotate = 60 | |
self.startSpin=0 | |
self.endSpin=359 | |
self.RunAfter = Timer.RunAfter | |
self.Lrange = range( self.startLroll, self.endLroll + 1 ) | |
self.Rrange = range( self.endRroll, self.startRroll + 1 ) | |
self.edge_Lrange = range( self.endLroll, 180 + 1 ) | |
self.Frange = range( self.endFrotate, self.startFrotate + 1 ) | |
self.Brange = range( self.startBrotate, self.endBrotate + 1 ) | |
self.edge_Brange = range( self.endBrotate, 180 + 1 ) | |
self.Srange= range(self.startSpin,self.endSpin+1) | |
def calcGRYO(self, var, basenum): | |
for i in range(2, 7, 2): | |
var.append(int((int(self.Re_buf[basenum + 1 + i] << 8 | self.Re_buf[basenum+i])) / 32768.0 * 180)) | |
def current_axis(self): | |
while (b'US' not in self.rx_bytes): #UQ=51, US=53 | |
# if self.RunAfter('g_3000'): | |
if self.uart.any(): | |
self.rx_bytes = self.uart.read() | |
self.sum_rx_bytes = self.sum_rx_bytes + self.rx_bytes | |
# print (11111) | |
# else: | |
# print ('can not get b"UT", so break while ') | |
# break | |
if (len(self.sum_rx_bytes))==33: | |
# print (22222) | |
self.Re_buf, self.angle, self.palstance, self.accel= [], [], [], [] | |
for i in range( 0, 33 ): | |
one_byte = self.sum_rx_bytes[i:i + 1] | |
one_int = int.from_bytes( one_byte, "big" ) | |
self.Re_buf.append( one_int ) | |
if self.getAccel and self.Re_buf[0] == 0x55 and self.Re_buf[1] == 0x51: | |
self.calcGRYO(self.accel, 0) | |
if self.getPalstance and self.Re_buf[11] == 0x55 and self.Re_buf[12] == 0x52: | |
self.calcGRYO( self.palstance, 11 ) | |
if self .getAngle and self.Re_buf[22] == 0x55 and self.Re_buf[23] == 0x53: | |
self.calcGRYO( self.angle, 22 ) | |
self.sum_rx_bytes, self.rx_bytes = b'', b'' | |
return {'accel':self.accel, 'palstance':self.palstance, 'angle': self.angle} | |
# function | |
def A2(self, angle, range1=0, range2=0, edge_range1=0, startMap1=0, endMap1=0, startMap2=0, endMap2=0, name=0, Maxlevel=10): #angle2direction | |
if angle in range1: | |
arduino_map = Arduino_func(startMap1, endMap1, 0, Maxlevel) | |
return name[0] + '%02d' % arduino_map.map( angle ) | |
elif angle in range2: | |
arduino_map = Arduino_func(endMap2, startMap2, 0, Maxlevel) | |
return name[1] + '%02d' % arduino_map.map( angle, -1 ) | |
elif angle in edge_range1: | |
return name[0] + '10' | |
elif angle < endMap2: | |
return name[1] + '10' | |
def A2D(self, angle, Maxlevel=10): #angle2direction | |
if angle in self.Lrange: | |
arduino_map = Arduino_func( self.startLroll, self.endLroll, 0, Maxlevel ) | |
return 'L' + '%02d' % arduino_map.map( angle ) | |
elif angle in self.Rrange: | |
arduino_map = Arduino_func( self.endRroll, self.startRroll, 0, Maxlevel ) | |
return 'R' + '%02d' % arduino_map.map( angle, -1 ) | |
elif angle in self.edge_Lrange: | |
return 'L10' | |
elif angle < self.endRroll: | |
return 'R10' | |
def A2M(self, angle, Maxlevel=10): #angle2move | |
if angle in self.Brange: | |
arduino_map = Arduino_func( self.startBrotate, self.endBrotate, 0, Maxlevel ) | |
return 'B' + '%02d' % arduino_map.map( angle ) | |
elif angle in self.Frange: | |
arduino_map = Arduino_func( self.endFrotate, self.startFrotate, 0, Maxlevel ) | |
return 'F' + '%02d' % arduino_map.map( angle, -1 ) | |
elif angle in self.edge_Brange: | |
return 'B10' | |
elif angle < self.endFrotate: | |
return 'F10' | |
def angleSpin(self,angle, Maxlevel=20): | |
if angle in self.Srange: | |
arduino_map = Arduino_func( self.startSpin, self.endSpin, 0, Maxlevel ) | |
return 'S' + '%02d' % arduino_map.map( angle ) | |
elif angle in self.endSpin: | |
return 'S20' | |
@property | |
def Str_CarDATA(self): | |
current_ANGLE = self.current_axis()['angle'] | |
str1=self.A2(current_ANGLE[0], self.Lrange, self.Rrange, self.edge_Lrange, self.startLroll, self.endLroll, self.startRroll, self.endRroll, 'LR') | |
str2=self.A2(current_ANGLE[1], self.Brange, self.Frange, self.edge_Brange, self.startBrotate, self.endBrotate, self.startFrotate, self.endFrotate, 'BF') | |
final_str1 = str1 + '-' + str2 | |
return final_str1 | |
@property | |
def Str_AngleDATA(self): | |
# print( self.current_axis ) | |
a=self.current_axis()['angle'] | |
# print (a) | |
if len(a)==3: | |
current_ANGLE = a | |
# str1 = self.A2D( current_ANGLE[0] ) | |
# str2 = self.A2M( current_ANGLE[1] ) | |
str1 = self.A2( current_ANGLE[0], self.Lrange, self.Rrange, self.edge_Lrange, self.startLroll, | |
self.endLroll, self.startRroll, self.endRroll, 'LR' ) | |
str2 = self.A2( current_ANGLE[1], self.Brange, self.Frange, self.edge_Brange, self.startBrotate, | |
self.endBrotate, self.startFrotate, self.endFrotate, 'BF' ) | |
str3 = self.A2( current_ANGLE[2], self.Srange, edge_range1=self.endSpin, startMap1=self.startSpin, | |
endMap1=self.endSpin, name='S', Maxlevel=20 ) | |
# str3 = self.angleSpin( current_ANGLE[2] ) | |
final_str1 = str1 + '-' + str2 + '-' + str3 | |
return final_str1 | |
# def Decode_CarDATA(self, bytes): | |
# return bytes.decode( "uft-8" ).split( '-' ) # format is: ['Lx/Rx','Fx/Bx'] | |
''' | |
USAGE: | |
# HOWTO get the three angles from MPU6050 | |
from GYRO import MPU6050_JY61P | |
mpu6050=MPU6050_JY61P() | |
print (mpu6050.current_angle) | |
>>>[num1,num2,num3] | |
# HOWTO switch_cli angle to direction or move, format of level is L/Rx, eg.L4 or R6 | |
mpu6050.angle2direction(mpu6050.current_angle[0]) | |
mpu6050.angle2move(mpu6050.current_angle[1]) | |
#ADVANCED USAGE: | |
#HOW TO use in car | |
from GYRO.MPU6050_JY61P import Decode_CarDATA | |
''' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#-*-coding:utf-8-*- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from machine import time_pulse_us,Pin | |
import time | |
class HCSR04: | |
# echo_timeout_us is based in chip range limit (400cm) | |
def __init__(self, direction,trigger_gpio=15, echo_gpio=14, echo_timeout_us=500 * 2 * 30 ): #D8=IO15,D5=IO14 | |
self.echo_timeout_us = echo_timeout_us | |
# Init trigger pin (out) | |
self.trigger = Pin( trigger_gpio, mode=Pin.OUT, pull=None ) | |
self.trigger.value(0) | |
self.direction=direction | |
# Init echo pin (in) | |
self.echo = Pin( echo_gpio, mode=Pin.IN, pull=None ) | |
self.sensor=1 | |
self._sensorRead=True | |
self.A, self.last_D = 3, 0 | |
self.T = 250 | |
self.moveOn=... | |
def _send_pulse_and_wait(self): | |
self.trigger.value(0) # Stabilize the sensor | |
time.sleep_us(5) | |
self.trigger.value(1) | |
# Send a 10us pulse. | |
time.sleep_us(10) | |
self.trigger.value(0) | |
try: | |
pulse_time = time_pulse_us(self.echo, 1, self.echo_timeout_us) | |
return pulse_time | |
except OSError as ex: | |
if ex.args[0] == 110: # 110 = ETIMEDOUT | |
raise OSError('Out of range') | |
raise ex | |
@property | |
def vector(self): | |
if self._sensorRead: | |
pulse_time = self._send_pulse_and_wait() | |
cms = (pulse_time / 2) / 29.1 | |
return self.direction +'-'+ (int(cms)) | |
def distance_cm(self): # only for test | |
pulse_time = self._send_pulse_and_wait() | |
cms = (pulse_time / 2) / 29.1 | |
return cms | |
def outputP(self, last, current, v): | |
if last < current: | |
return 30, 70, 100 | |
else: | |
return 30 + v ** (self.A / 5), 70 + v ** (self.A / 5), 100 + v ** (self.A / 5) | |
def outputTime(current, p, NowMove, SetMove): # moveOn and stopcar and driver, need to be parameter | |
if current < p[0]: | |
return 100,True | |
elif current > p[0]: | |
return 250,False | |
elif current > p[1]: | |
return 350,False | |
elif current > p[2]: | |
return 1000,False | |
def outputV(current, last, time): | |
return abs( current - last ) / time * 1000 | |
def watchCar(self, stopcar, moveOn, driver, M): | |
# program start | |
if self._sensorRead == True: | |
direction = self.vector.split( '-' )[0] | |
current_D = ( self.vector.split( '-' )[1] ) | |
if current_D < 120: | |
try: # For the moment, start the car, distance <120, but no D, then occurs error. | |
self.T,STOP = self.outputTime( current=current_D, | |
p=self.outputP( self.last_D, current_D, | |
v=self.outputV( current_D, self.last_D, | |
self.T ) ) ) | |
if STOP: | |
stopcar( direction[0] ) | |
if moveOn and M[0] == direction[0]: | |
driver.move( 'F00' ) | |
self.moveOn = False | |
except: | |
print( 'try current_D < 120 got problem' ) | |
pass | |
else: | |
self.T = 1000 | |
self.T = 100 if self.T == None else self.T | |
sensitivity = 'ms_' + str( self.T ) | |
# figure out the distance | |
self.last_D = current_D | |
self._sensorRead = not self._sensorRead | |
return self.moveOn, sensitivity |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from machine import PWM,Pin | |
from lib.COMMON.Arduino import Arduino_func | |
# IO5= SERVO PWM | |
# IO12,13 = MOTER DRVIER, when you have four wheels drive, IO14, IO15 occupied by default | |
# IO4 = MOTER DRVIER PWM (6612) | |
# IO14, IO15(trig) =HCSR04 | |
class servo_MG996R(): | |
# d1-gpio5 for servo, d1-gpio5 as PWM | |
__slots__ = 'IO', 'Level_min', 'Level_max','Straight','Left_max','Right_max' | |
def __init__(self, IO=5, Level_min=0, Level_max=10, Straight=70, Left_max=50, Right_max=105): | |
self.servo = PWM( Pin( IO ), freq=50 ) | |
self.servo.duty( Straight ) | |
# time.sleep( 1 ) # you may not need it | |
self.Min_Turn_Level = Level_min | |
self.Max_Turn_Level = Level_max | |
self.Go_Straight = Straight | |
self.Max_Turn_Left = Left_max | |
self.Max_Turn_Right = Right_max | |
def turn(self, value): | |
duty_value = list( value ) | |
duty_value.pop( 0 ) | |
duty_value = int( ''.join( duty_value ) ) | |
if value[0] == 'L': | |
arduino_map = Arduino_func( self.Min_Turn_Level, self.Max_Turn_Level, self.Max_Turn_Left, | |
self.Go_Straight ) | |
duty_value = arduino_map.map( duty_value, -1 ) | |
elif value[0] == 'R': | |
arduino_map = Arduino_func( self.Min_Turn_Level, self.Max_Turn_Level, self.Go_Straight, | |
self.Max_Turn_Right ) | |
duty_value = arduino_map.map( duty_value ) | |
self.servo.duty( duty_value ) | |
class MD_L298N(): | |
# d6-gpio12, d7-gpio13 for motorA, | |
# d5-gpio14, d8-gpio15 for motorB. | |
def __init__(self, IO1=12, IO2=13, IO3=14, IO4=15, type='Single', Min_Level=0, Max_Level=10, Min_duty=0, | |
Max_duty=1023): | |
self.DOUBLEMOTOR = False | |
self.DOUBLEMOTOR_DIRECTIONAL = False | |
self.servo_IN1 = PWM( Pin( IO1 ), freq=50 ) | |
self.servo_IN2 = PWM( Pin( IO2 ), freq=50 ) | |
self.Min_Move_Level = Min_Level | |
self.Max_Move_Level = Max_Level | |
self.Min_DUTY = Min_duty | |
self.Max_DUTY = Max_duty | |
if 'Double' in type: | |
if type == 'Double_directional': | |
self.DOUBLEMOTOR = True | |
else: | |
self.DOUBLEMOTOR_DIRECTIONAL = True | |
self.servo_IN3 = PWM( Pin( IO3 ), freq=50 ) | |
self.servo_IN4 = PWM( Pin( IO4 ), freq=50 ) | |
# self.motor = self.servo.duty( 500 ) | |
def forward(self, value): | |
self.servo_IN1.duty( value ) | |
self.servo_IN2.duty( 0 ) | |
if self.DOUBLEMOTOR: | |
self.servo_IN3.duty( value ) | |
self.servo_IN4.duty( 0 ) | |
def backward(self, value): | |
self.servo_IN1.duty( 0 ) | |
self.servo_IN2.duty( value ) | |
if self.DOUBLEMOTOR: | |
self.servo_IN3.duty( 0 ) | |
self.servo_IN4.duty( value ) | |
# OUT1,OUT2 to left motor, OUT3,OUT4 to right motor | |
def turnleft(self, value): | |
if self.DOUBLEMOTOR_DIRECTIONAL: | |
self.servo_IN1.duty( 0 ) | |
self.servo_IN2.duty( value ) | |
self.servo_IN3.duty( value ) | |
self.servo_IN4.duty( 0 ) | |
def turnright(self, value): | |
if self.DOUBLEMOTOR_DIRECTIONAL: | |
self.servo_IN1.duty( value ) | |
self.servo_IN2.duty( 0 ) | |
self.servo_IN3.duty( 0 ) | |
self.servo_IN4.duty( value ) | |
def move(self, value): | |
move_value = list( value ) | |
move_value.pop( 0 ) | |
move_value = int( ''.join( move_value ) ) | |
if value[0] == 'F': | |
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY ) | |
move_value = arduino_map.map( move_value ) | |
self.forward( move_value ) | |
elif value[0] == 'B': | |
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY ) | |
move_value = arduino_map.map( move_value ) | |
self.backward( move_value ) | |
# print('move_value' + str(move_value)) | |
class MD_TB6612FNG_DRV8833(): | |
# d6-gpio12, d7-gpio13 for motorA, d2-gpio4 as PWMA | |
# d5-gpio14, d8-gpio15 for motorB. NO SUFFICIENT GPIO, use PWMA. | |
def __init__(self, in1=12, in2=13, in3=14, in4=15, pwm=4, type='Single', Min_Level=0, Max_Level=10, Min_duty=0, | |
Max_duty=1023): | |
self.DOUBLEMOTOR = False | |
self.DOUBLEMOTOR_DIRECTIONAL = False | |
self.PWMA = PWM( Pin( pwm ), freq=50 ) | |
self.MOTORA = {'out1': Pin( in1, Pin.OUT ), 'out2': Pin( in2, Pin.OUT )} | |
self.Min_Move_Level = Min_Level | |
self.Max_Move_Level = Max_Level | |
self.Min_DUTY = Min_duty | |
self.Max_DUTY = Max_duty | |
if 'Double' in type: | |
if type == 'Double_directional': | |
self.DOUBLEMOTOR = True | |
else: | |
self.DOUBLEMOTOR_DIRECTIONAL = True | |
self.MOTORB = {'out1': Pin( in3, Pin.OUT ), 'out2': Pin( in4, Pin.OUT )} | |
# self.motor = self.servo.duty( 500 ) | |
def motorSpin(self, motor,num): | |
if num == 1: | |
motor['out1'].value( 1 ) | |
motor['out2'].value( 0 ) | |
elif num == 0: | |
motor['out1'].value( 0 ) | |
motor['out2'].value( 1 ) | |
def forward(self, value): | |
self.PWMA.duty( value ) | |
self.motorSpin( self.MOTORA,1 ) | |
if self.DOUBLEMOTOR: | |
self.motorSpin(self.MOTORB, 1 ) | |
def backward(self, value): | |
self.PWMA.duty( value ) | |
self.motorSpin(self.MOTORA, 0 ) | |
if self.DOUBLEMOTOR: | |
self.motorSpin(self.MOTORB, 0 ) | |
# ONLY FOR DOUBLE MOTORS, motorA = left side wheels, motorB = right side wheels | |
def motor_turnleft(self, value): | |
self.PWMA.duty( value ) | |
if self.DOUBLEMOTOR_DIRECTIONAL: | |
self.motorSpin(self.MOTORA, 0 ) | |
self.motorSpin(self.MOTORB, 1 ) | |
def motor_turnright(self, value): | |
self.PWMA.duty( value ) | |
if self.DOUBLEMOTOR_DIRECTIONAL: | |
self.motorSpin(self.MOTORA, 1 ) | |
self.motorSpin(self.MOTORB, 0 ) | |
def move(self, value): | |
move_value = list( value ) | |
move_value.pop( 0 ) | |
move_value = int( ''.join( move_value ) ) | |
if value[0] == 'F': | |
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY ) | |
move_value = arduino_map.map( move_value ) | |
self.forward( move_value ) | |
elif value[0] == 'B': | |
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY ) | |
move_value = arduino_map.map( move_value ) | |
self.backward( move_value ) | |
# print('move_value' + str(move_value)) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from machine import PWM,Pin | |
from lib.COMMON.Arduino import Arduino_func | |
class MD_TB6612FNG_DRV8833(): | |
# d6-gpio12, d7-gpio13 for motorA, d2-gpio4 as PWMA | |
# d5-gpio14, d8-gpio15 for motorB. NO SUFFICIENT GPIO, use PWMA. | |
def __init__(self, in1=12, in2=13, in3=14, in4=15, pwm=4, type='Single', Min_Level=0, Max_Level=10, Min_duty=0, | |
Max_duty=1023): | |
self.DOUBLEMOTOR = False | |
self.DOUBLEMOTOR_DIRECTIONAL = False | |
self.PWMA = PWM( Pin( pwm ), freq=50 ) | |
self.MOTORA = {'out1': Pin( in1, Pin.OUT ), 'out2': Pin( in2, Pin.OUT )} | |
self.Min_Move_Level = Min_Level | |
self.Max_Move_Level = Max_Level | |
self.Min_DUTY = Min_duty | |
self.Max_DUTY = Max_duty | |
if 'Double' in type: | |
if type == 'Double_directional': | |
self.DOUBLEMOTOR = True | |
else: | |
self.DOUBLEMOTOR_DIRECTIONAL = True | |
self.MOTORB = {'out1': Pin( in3, Pin.OUT ), 'out2': Pin( in4, Pin.OUT )} | |
# self.motor = self.servo.duty( 500 ) | |
def motorSpin(self, motor,num): | |
if num == 1: | |
motor['out1'].value( 1 ) | |
motor['out2'].value( 0 ) | |
elif num == 0: | |
motor['out1'].value( 0 ) | |
motor['out2'].value( 1 ) | |
def forward(self, value): | |
self.PWMA.duty( value ) | |
self.motorSpin( self.MOTORA,1 ) | |
if self.DOUBLEMOTOR: | |
self.motorSpin(self.MOTORB, 1 ) | |
def backward(self, value): | |
self.PWMA.duty( value ) | |
self.motorSpin(self.MOTORA, 0 ) | |
if self.DOUBLEMOTOR: | |
self.motorSpin(self.MOTORB, 0 ) | |
# ONLY FOR DOUBLE MOTORS, motorA = left side wheels, motorB = right side wheels | |
def motor_turnleft(self, value): | |
self.PWMA.duty( value ) | |
if self.DOUBLEMOTOR_DIRECTIONAL: | |
self.motorSpin(self.MOTORA, 0 ) | |
self.motorSpin(self.MOTORB, 1 ) | |
def motor_turnright(self, value): | |
self.PWMA.duty( value ) | |
if self.DOUBLEMOTOR_DIRECTIONAL: | |
self.motorSpin(self.MOTORA, 1 ) | |
self.motorSpin(self.MOTORB, 0 ) | |
def move(self, value): | |
move_value = list( value ) | |
move_value.pop( 0 ) | |
move_value = int( ''.join( move_value ) ) | |
if value[0] == 'F': | |
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY ) | |
move_value = arduino_map.map( move_value ) | |
self.forward( move_value ) | |
elif value[0] == 'B': | |
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY ) | |
move_value = arduino_map.map( move_value ) | |
self.backward( move_value ) | |
# print('move_value' + str(move_value)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from machine import PWM,Pin | |
from lib.COMMON.Arduino import Arduino_func | |
class MD_L298N(): | |
# d6-gpio12, d7-gpio13 for motorA, | |
# d5-gpio14, d8-gpio15 for motorB. | |
def __init__(self, IO1=12, IO2=13, IO3=14, IO4=15, type='Single', Min_Level=0, Max_Level=10, Min_duty=0, | |
Max_duty=1023): | |
self.DOUBLEMOTOR = False | |
self.DOUBLEMOTOR_DIRECTIONAL = False | |
self.servo_IN1 = PWM( Pin( IO1 ), freq=50 ) | |
self.servo_IN2 = PWM( Pin( IO2 ), freq=50 ) | |
self.Min_Move_Level = Min_Level | |
self.Max_Move_Level = Max_Level | |
self.Min_DUTY = Min_duty | |
self.Max_DUTY = Max_duty | |
if 'Double' in type: | |
if type == 'Double_directional': | |
self.DOUBLEMOTOR = True | |
else: | |
self.DOUBLEMOTOR_DIRECTIONAL = True | |
self.servo_IN3 = PWM( Pin( IO3 ), freq=50 ) | |
self.servo_IN4 = PWM( Pin( IO4 ), freq=50 ) | |
# self.motor = self.servo.duty( 500 ) | |
def forward(self, value): | |
self.servo_IN1.duty( value ) | |
self.servo_IN2.duty( 0 ) | |
if self.DOUBLEMOTOR: | |
self.servo_IN3.duty( value ) | |
self.servo_IN4.duty( 0 ) | |
def backward(self, value): | |
self.servo_IN1.duty( 0 ) | |
self.servo_IN2.duty( value ) | |
if self.DOUBLEMOTOR: | |
self.servo_IN3.duty( 0 ) | |
self.servo_IN4.duty( value ) | |
# OUT1,OUT2 to left motor, OUT3,OUT4 to right motor | |
def turnleft(self, value): | |
if self.DOUBLEMOTOR_DIRECTIONAL: | |
self.servo_IN1.duty( 0 ) | |
self.servo_IN2.duty( value ) | |
self.servo_IN3.duty( value ) | |
self.servo_IN4.duty( 0 ) | |
def turnright(self, value): | |
if self.DOUBLEMOTOR_DIRECTIONAL: | |
self.servo_IN1.duty( value ) | |
self.servo_IN2.duty( 0 ) | |
self.servo_IN3.duty( 0 ) | |
self.servo_IN4.duty( value ) | |
def move(self, value): | |
move_value = list( value ) | |
move_value.pop( 0 ) | |
move_value = int( ''.join( move_value ) ) | |
if value[0] == 'F': | |
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY ) | |
move_value = arduino_map.map( move_value ) | |
self.forward( move_value ) | |
elif value[0] == 'B': | |
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY ) | |
move_value = arduino_map.map( move_value ) | |
self.backward( move_value ) | |
# print('move_value' + str(move_value)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from machine import PWM,Pin | |
from lib.COMMON.Arduino import Arduino_func | |
class MD_TB6612FNG(): | |
# d6-gpio12, d7-gpio13 for motorA, d2-gpio4 as PWMA | |
# d5-gpio14, d8-gpio15 for motorB. NO SUFFICIENT GPIO, use PWMA. | |
def __init__(self, in1=12, in2=13, in3=14, in4=15, pwm=4, type='Single', Min_Level=0, Max_Level=10, Min_duty=0, | |
Max_duty=1023): | |
self.DOUBLEMOTOR = False | |
self.DOUBLEMOTOR_DIRECTIONAL = False | |
self.PWMA = PWM( Pin( pwm ), freq=50 ) | |
self.MOTORA = {'out1': Pin( in1, Pin.OUT ), 'out2': Pin( in2, Pin.OUT )} | |
self.Min_Move_Level = Min_Level | |
self.Max_Move_Level = Max_Level | |
self.Min_DUTY = Min_duty | |
self.Max_DUTY = Max_duty | |
if 'Double' in type: | |
if type == 'Double_directional': | |
self.DOUBLEMOTOR = True | |
else: | |
self.DOUBLEMOTOR_DIRECTIONAL = True | |
self.MOTORB = {'out1': Pin( in3, Pin.OUT ), 'out2': Pin( in4, Pin.OUT )} | |
# self.motor = self.servo.duty( 500 ) | |
def motorSpin(self, motor,num): | |
if num == 1: | |
motor['out1'].value( 1 ) | |
motor['out2'].value( 0 ) | |
elif num == 0: | |
motor['out1'].value( 0 ) | |
motor['out2'].value( 1 ) | |
def forward(self, value): | |
self.PWMA.duty( value ) | |
self.motorSpin( self.MOTORA,1 ) | |
if self.DOUBLEMOTOR: | |
self.motorSpin(self.MOTORB, 1 ) | |
def backward(self, value): | |
self.PWMA.duty( value ) | |
self.motorSpin(self.MOTORA, 0 ) | |
if self.DOUBLEMOTOR: | |
self.motorSpin(self.MOTORB, 0 ) | |
# ONLY FOR DOUBLE MOTORS, motorA = left side wheels, motorB = right side wheels | |
def motor_turnleft(self, value): | |
self.PWMA.duty( value ) | |
if self.DOUBLEMOTOR_DIRECTIONAL: | |
self.motorSpin(self.MOTORA, 0 ) | |
self.motorSpin(self.MOTORB, 1 ) | |
def motor_turnright(self, value): | |
self.PWMA.duty( value ) | |
if self.DOUBLEMOTOR_DIRECTIONAL: | |
self.motorSpin(self.MOTORA, 1 ) | |
self.motorSpin(self.MOTORB, 0 ) | |
def move(self, value): | |
move_value = list( value ) | |
move_value.pop( 0 ) | |
move_value = int( ''.join( move_value ) ) | |
if value[0] == 'F': | |
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY ) | |
move_value = arduino_map.map( move_value ) | |
self.forward( move_value ) | |
elif value[0] == 'B': | |
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY ) | |
move_value = arduino_map.map( move_value ) | |
self.backward( move_value ) | |
# print('move_value' + str(move_value)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from machine import PWM,Pin | |
from lib.COMMON.Arduino import Arduino_func | |
class servo_MG996R(): | |
# d1-gpio5 for servo, d1-gpio5 as PWM | |
__slots__ = 'IO', 'Level_min', 'Level_max','Straight','Left_max','Right_max' | |
def __init__(self, IO=5, Level_min=0, Level_max=10, Straight=70, Left_max=50, Right_max=105): | |
self.servo = PWM( Pin( IO ), freq=50 ) | |
self.servo.duty( Straight ) | |
# time.sleep( 1 ) # you may not need it | |
self.Min_Turn_Level = Level_min | |
self.Max_Turn_Level = Level_max | |
self.Go_Straight = Straight | |
self.Max_Turn_Left = Left_max | |
self.Max_Turn_Right = Right_max | |
def turn(self, value): | |
duty_value = list( value ) | |
duty_value.pop( 0 ) | |
duty_value = int( ''.join( duty_value ) ) | |
if value[0] == 'L': | |
arduino_map = Arduino_func( self.Min_Turn_Level, self.Max_Turn_Level, self.Max_Turn_Left, | |
self.Go_Straight ) | |
duty_value = arduino_map.map( duty_value, -1 ) | |
elif value[0] == 'R': | |
arduino_map = Arduino_func( self.Min_Turn_Level, self.Max_Turn_Level, self.Go_Straight, | |
self.Max_Turn_Right ) | |
duty_value = arduino_map.map( duty_value ) | |
self.servo.duty( duty_value ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from lib.COMMON.timer import Timer | |
class Car(): | |
def __init__(self, driver, steering, watchCar=None): | |
if watchCar!=None: | |
self.watchCar = watchCar | |
else: | |
self.watchCar = 0 | |
self.driver = driver | |
self.servo = steering | |
self.RunAfter = Timer.RunAfter | |
self.engineData = {} | |
self.lastM = 0 | |
self.stepDown = 0 | |
self.listStep = [] | |
self.lastM1 = '' | |
self.lastData = '' | |
self.sensitivity = 'ms_1000' | |
self.D, self.M = '', '' | |
self.forwardNO, self.backwardNO = 1, 1 | |
self.moveOn = 1 | |
self.A, self.last_D, self.current_D = 3, 0, 0 | |
self.T = 250 | |
self.delay = 't_400' | |
def stepnum(self, num1, num2, M1): | |
list1 = [] | |
j = num1 - num2 | |
if j > 0: | |
for i in range( abs( j ) ): | |
list1.append( M1 + str( num2 + i ) ) | |
list1.append( M1 + str( num1 ) ) | |
else: | |
for i in range( abs( j ) ): | |
list1.append( M1 + str( num1 + i ) ) | |
list1.append( M1 + str( num2 ) ) | |
return list1 | |
def stopcar(self, direction): | |
if direction == 'F': | |
self.forwardNO = 0 | |
elif direction == 'B': | |
self.backwardNO = 0 | |
def engine(self, data, soloRemote): | |
D, M = data | |
self.engineData.update( {'direction': D, 'movement': M} if soloRemote else { | |
'direction': data} if data[0] in ('L', 'R') else {'movement': data} ) | |
self.servo.turn( self.engineData['direction'] ) | |
if (self.forwardNO and self.engineData['movement'][0] == 'F') or ( | |
self.backwardNO and self.engineData['movement'][0] == 'B') or self.forwardNO == 0 and \ | |
self.engineData['movement'][ | |
0] == 'B' or self.backwardNO == 0 and self.engineData['movement'][0] == 'F': | |
self.driver.move( self.engineData['movement'] ) | |
self.forwardNO, self.backwardNO, self.moveOn = 1, 1, 1 | |
def go(self, data, soloRemote=1): | |
if soloRemote == 0: | |
pass | |
else: | |
if data == 'offline': | |
self.driver.move( 'F00' ) | |
self.servo.turn( 'L00' ) | |
elif data != self.lastData and data != None: | |
self.D, self.M = data.split( '-' ) | |
self.M, M1 = self.M[1:], self.M[:1] # M1 is F or B | |
if M1 != self.lastM1: | |
self.lastM = 0 | |
if int( self.M ) - (self.lastM) > 1: | |
self.listStep = self.stepnum( int( self.M ) - 1, self.lastM, M1 ) | |
self.listStep.pop( 0 ) # already ranoll | |
self.M = '%02d' % (self.lastM + 1) | |
self.stepDown = 1 | |
self.lastM = int( self.M ) | |
self.M = M1 + self.M | |
self.engine( (self.D, self.M), soloRemote=soloRemote ) | |
else: | |
self.lastM = int( self.M ) | |
self.M = M1 + self.M | |
self.stepDown = 0 | |
# print( 'stepped The running is: ', self.D, M ) | |
self.engine( (self.D, self.M), soloRemote=soloRemote ) | |
try: | |
# del (globals()[delay]) # in order to run again in first | |
self.delay=... | |
except: | |
pass | |
self.lastM1 = M1 | |
self.lastData = data | |
elif self.stepDown: | |
try: | |
if self.RunAfter( self.delay ): # in order to slow down gryo action. | |
M1, self.M = self.listStep[0][:1], self.listStep[0][1:] | |
self.M = M1 + '%02d' % (int( self.M ) + 1) | |
self.listStep.pop( 0 ) | |
self.engine( (self.D, self.M), soloRemote=soloRemote ) | |
except: | |
self.stepDown = 0 | |
if self.watchCar != 0: | |
if self.RunAfter( self.sensitivity ): | |
self.sensitivity,self.moveOn = self.watchCar( self.stopcar, self.moveOn, self.driver,self.M) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from machine import Pin | |
from lib.COMMON.timer import Timer | |
import time | |
from lib.COMMON.management import Sword_AP | |
class Sword(): # using Timer | |
def __init__(self, IO, conn, func,force): | |
self.conn=conn | |
self.func=func | |
self.force=force | |
from lib.COMMON.management import SWORD_Lhand | |
self.LhandID = SWORD_Lhand.ID | |
del (SWORD_Lhand) | |
from lib.COMMON.management import SWORD_Rhand | |
self.RhandID = SWORD_Rhand.ID | |
del (SWORD_Rhand) | |
from lib.COMMON.management import SWORD_target | |
self.targetID = SWORD_target.ID | |
del (SWORD_target) | |
self.IO = {'RELAY': Pin( IO.RELAY_OUT, Pin.OUT )} | |
self.RunAfter = Timer.RunAfter | |
self.L1,self.runTarget= 0, 0 | |
# self.FP,self.BP,self.LP,self.RP=0,0,0,0 | |
self.runON=1 # force will set it 1 | |
self.spinData,self.spinCount=20,0 | |
self.CTRLcar=0 | |
self.LaserON = 0 | |
self.F,self.B,self.L,self.R=0,0,0,0 | |
self.webrepl=0 | |
self.webreplON=1 | |
self.doneTARGET = 0 | |
self.TH1=1 # threshold 1, for activate the car | |
def runWebrepl(self, v, w, x, y): | |
# self.spinCount = 0 # reset spin gyro | |
if self.webreplON: | |
v += 1 | |
if v >40: | |
self.runON = 0 | |
self.clickRelay( 1.2 ) | |
self.webrepl = 1 | |
elif (v > 0 and w == 0 and x == 0 and y == 0): | |
print ('ready for webrepl :',v) # for test | |
return v | |
# run the next codes, you always have chance to switch webrepl, otherwise, Once a Chance! | |
else: | |
self.webreplON=0 | |
def clickRelay(self,t): | |
self.IO['RELAY'].value(1) # set relay ON, for 0.2 | |
time.sleep(t) | |
self.IO['RELAY'].value(0) | |
print ('clickRelay:',t ) | |
def activate_car(self, GYRO): | |
data=int(GYRO[9:11]) | |
# if 1: # for test | |
if self.TH1 and data < self.spinData: # officially | |
self.spinCount += 1 | |
print ('spinCount',self.spinCount) | |
# self.F, self.B, self.L, self.R = 0, 0, 0, 0 # no need to reset, because if spin, no chance to activate webrepl | |
self.spinData = data | |
if self.TH1 and self.spinCount>45: | |
self.conn.updateSwitch( ID=self.LhandID, cmd=Sword_AP.CMD.Lhand ) | |
self.conn.updateSwitch( ID=self.RhandID, cmd=Sword_AP.CMD.Rhand2 ) | |
self.TH1 = 0 | |
elif self.TH1==0 and self.RunAfter('wait_3000'): | |
print ('\nactivate_car\n') | |
self.clickRelay(1.2) | |
# self.TH1 = 1 | |
return 1 | |
else: | |
return 0 | |
def activate_target(self, GYRO): | |
TURN, GO = GYRO[0], GYRO[4] | |
i, j = int( GYRO[1:3] ) , int( GYRO[5:7] ) | |
# if self.L3 or 1: # for test | |
if self.runTarget : # officially | |
if self.doneTARGET==0 and self.RunAfter( 'ii_3000' ): | |
print ('run target') | |
self.conn.updateSwitch( ID=self.targetID, cmd=Sword_AP.CMD.target ) # activate target | |
# self.conn.updateSwitch(ID=self.LhandID, cmd=Sword_AP.CMD.Lhand) # for test, because i need a hint | |
print(self.conn.switch_dict, '\n') | |
self.doneTARGET=1 | |
elif self.doneTARGET and self.RunAfter('ii_10000'): | |
# self.conn.Main() # To refresh conn.Ldata_str, make it not be 'Activate' , this is for the old code?? YES i think it's useless now, it's in "while" of main.py | |
self.runTarget = 0 | |
self.clickRelay(1.2) # shutdown the sword. | |
self.force.gotForce = 0 #MC can be input now | |
self.doneTARGET = 0 | |
self.LaserON = 0 | |
self.runON=1 | |
self.spinCount=0 | |
# self.FP,self.BP,self.LP,self.RP=0,0,0,0 | |
print ('Target Mission has done, showdown the sword.') | |
# elif self.FP and self.BP and self.LP and self.RP or 1: # for test | |
elif self.FP and self.BP and self.LP and self.RP: # officially | |
self.FP, self.BP, self.LP, self.RP = 0, 0, 0, 0 | |
self.conn.updateSwitch( ID=self.RhandID, cmd=Sword_AP.CMD.Rhand2 ) # the hand which hold the sword_srv, just activate 3s | |
# self.conn.updateSwitch(ID=self.LhandID, cmd=Sword_AP.CMD.Lhand) # for test, because i need to see the both when im testing | |
self.runTarget = 1 | |
# elif GO == 'F' and j > 7 or 1: # for test | |
elif GO == 'F' and j > 7 : # officially | |
self.F = self.runWebrepl(self.F, self.B, self.L, self.R) # keep gyro in the direction about 50*200ms | |
if self.FP == 0: | |
print('\nFP=1') | |
self.func[0](self.conn) | |
self.FP = 1 | |
# elif GO == 'B' and j > 7 or 1 : # for test | |
elif GO == 'B' and j > 7: # officially | |
self.B = self.runWebrepl(self.B, self.F, self.L, self.R) | |
if self.BP == 0: | |
print('\nBP=1') | |
self.func[1](self.conn) | |
self.BP = 1 | |
# elif l == 'L' and i > 7 or 1:# for test | |
elif TURN == 'L' and i > 7:# officially | |
self.L = self.runWebrepl(self.L, self.B, self.F, self.R) | |
if self.LP == 0: | |
print('\nLP=1') | |
self.func[2](self.conn) | |
self.LP = 1 | |
# elif l == 'R' and i > 7 or 1:# for test | |
elif TURN == 'R' and i > 7:# officially | |
self.R = self.runWebrepl(self.R, self.B, self.L, self.F) | |
if self.RP == 0: | |
print('\nRP=1') | |
self.func[3](self.conn) | |
self.RP = 1 | |
def run(self,GYRO): | |
# if 1: # for test | |
if self.runON: # for officially, why do i need runON, when go for webrepl, self.runON=0 | |
if GYRO==None: # when no install GYRO, test program | |
GYRO='L00-F00-S00' | |
if self.LaserON==0: | |
self.clickRelay(0.2) | |
# self.clickRelay(0.2, self-locked) # new plan, use globle var to make sure it run for once in time | |
self.LaserON=1 | |
self.FP, self.BP, self.LP, self.RP = 0, 0, 0, 0 | |
if self.L1: | |
if self.RunAfter('L1_200'): | |
self.activate_target(GYRO) | |
self.CTRLcar=self.activate_car(GYRO) | |
elif self.L1==0: # the hand which touch electrical light, | |
self.conn.updateSwitch( ID=self.LhandID, cmd=Sword_AP.CMD.Lhand ) # just activate 1s | |
self.L1=1 | |
print('L1 activated') | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from lib.SRV.sword_srv.SWORD import Sword | |
from lib.DRV.GYRO.MPU6050_JY61P import MPU6050_JY61P | |
from lib.COMMON.management import Sword_AP as Main, SWORD_Rhand | |
from lib.COMMON.MMG_server import microMSG_server | |
from lib.COMMON.force import Force | |
def funcSword(P=None): | |
# print('update 1_2:1-1') | |
P.updateSwitch( ID=SWORD_Rhand.ID, cmd=Main.CMD.Rhand1 ) # the hand which hold the sword_srv. | |
def turnONsword(P=None):# set func1,func2,func3,func4, func1 will run as the final target | |
pass | |
# sword.runON=1 | |
conn = microMSG_server( WIFI=Main ) | |
mpu6050 = MPU6050_JY61P() | |
force = Force(IO=Main.IO, funclist=(turnONsword,)) | |
sword = Sword( IO=Main.IO, conn=conn, func=(funcSword, funcSword, funcSword, funcSword), force=force ) # [0]=F,[1]=B,[2]=L,[3]=R | |
# mpu6050.Str_AngleDATA # cannot cancel this. or it will return "None" afterwards, why ? | |
while True: | |
conn.Main() | |
if 1: # for test | |
# if force.forceON: # for officially | |
# mpu6050.Str_AngleDATA | |
sword.run(mpu6050.Str_AngleDATA) | |
if sword.CTRLcar or sword.webrepl: #spin clockwise 30 time, car. down over 9 times | |
print ('While break!') | |
break | |
if sword.CTRLcar: # when webrepl=1, program stops for webrepl | |
print ('\n\n\n\nNOW, control car.') | |
# del(conn, sword, Sword, force, Force, microMSG_server, Main, SWORD_Rhand, mpu6050, MPU6050_JY61P) # can't delete mpu6050, important!!! | |
del(conn, sword, Sword, force, Force, microMSG_server, Main, SWORD_Rhand) | |
import gc | |
gc.collect() | |
exec(open('./gyroCTRLcar.py').read(),globals()) | |
''' | |
1. activate sword | |
force 7s in 10s, CMD.Lhand | |
2. activate target | |
go forward tilt more than level7, CMD.Rhand1 | |
go backward tilt more than level7, CMD.Rhand1 | |
go left tilt more than level7, CMD.Rhand1 | |
go right tilt more than level7, CMD.Rhand1 | |
2.1 activate car | |
spin clockwise about 31 times, CMD.Rhand2 and CMD.Lhand | |
3. activate webpel | |
tilt more than level7 for ten seconds. | |
''' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import machine | |
machine.reset() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import network | |
ap = network.WLAN(network.AP_IF) # create access-point interface | |
ap.config(essid='sword_ap', password='123456789') # set the ESSID of the access point | |
ap.active(1) # activate the interface |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# POWER STRIP TEST ON ESP8266, format is 'gpio1_0,gpio2_1,gpio3_0,gpio4_1' | |
from lib.COMMON.management import powerStrip_AP as Main,powerStrip_1 as No1, powerStrip_2 as No2 | |
from lib.COMMON.MMG_server import microMSG_server | |
conn = microMSG_server( WIFI=Main,test=True ) | |
# CHANGE THE SWITCH STATUS, IT WILL READ CLIENT REQUESTS IN EVERY 300MS, AND SEND THE LATEST STATUS | |
# conn.updateSwitch( No1, '2,3-1' ) # 1-0 means slot1 is off | |
# conn.updateSwitch( No2, '2,3-1' ) # 1-0 means slot2 is off | |
# conn.updateSwitch( Main, (Main.S1=True, Main.S2) ) # 1-0 means slot1 is off | |
while True: | |
conn.Main() | |
if conn.RunAfter('abc_3000'): | |
conn.updateSwitch( No1.ID, Main.CMD.No1 ) # 1-0 means slot1 is off | |
conn.updateSwitch( No2.ID, Main.CMD.No2 ) # 1-0 means slot2 is off | |
print('\n', conn.switch_dict ) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# POWER STRIP TEST ON ESP8266, format is 'gpio1_0,gpio2_1,gpio3_0,gpio4_1' | |
from lib.COMMON.management import Sword_AP as Main, SWORD_Rhand as Rhand,SWORD_Lhand as Lhand, SWORD_target as target | |
from lib.COMMON.MMG_server import microMSG_server | |
conn = microMSG_server( WIFI=Main,test=True ) | |
while True: | |
conn.Main() | |
if conn.RunAfter('abc_3000'): | |
conn.updateSwitch( Rhand.ID, Main.CMD.Rhand1 ) # 1-0 means slot1 is off | |
conn.updateSwitch(Rhand.ID, Main.CMD.Rhand2) # 1-0 means slot1 is off | |
conn.updateSwitch( Lhand.ID, Main.CMD.Lhand ) # 1-0 means slot2 is off | |
conn.updateSwitch( target.ID, Main.CMD.target ) # 1-0 means slot2 is off | |
print('\n', conn.switch_dict ) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#HOWTO replace SWORD.py | |
# 1. upload SWORD.py | |
# 2. Run the codes in webrepl | |
os.remove('./lib/SRV/sword_srv/SWORD.py') | |
os.rename('SWORD.py', './lib/SRV/sword_srv/SWORD.py' ) | |
os.rename('force.py', './lib/COMMON/force.py' ) | |
./lib/DRV/GYRO/MPU6050_JY61P.py | |
./lib/COMMON/MMG_client.py | |
os.listdir('./lib/COMMON') | |
os.listdir('./lib/CLI/gyroctrl_cli') | |
os.listdir('./lib/CLI/switch_cli') | |
exec(open('./gyroCTRLcar.py').read(),globals()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment