Created
January 12, 2020 15:17
-
-
Save TheCherry/8857498864e2268d11607aaa5c4b3ccf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import serial | |
import select | |
import struct | |
import sys | |
import time | |
import math | |
parser = argparse.ArgumentParser('Client for sending controller commands to a controller emulator') | |
parser.add_argument('port') | |
args = parser.parse_args() | |
STATE_OUT_OF_SYNC = 0 | |
STATE_SYNC_START = 1 | |
STATE_SYNC_1 = 2 | |
STATE_SYNC_2 = 3 | |
STATE_SYNC_OK = 4 | |
# Actual Switch DPAD Values | |
A_DPAD_CENTER = 0x08 | |
A_DPAD_U = 0x00 | |
A_DPAD_U_R = 0x01 | |
A_DPAD_R = 0x02 | |
A_DPAD_D_R = 0x03 | |
A_DPAD_D = 0x04 | |
A_DPAD_D_L = 0x05 | |
A_DPAD_L = 0x06 | |
A_DPAD_U_L = 0x07 | |
# Enum DIR Values | |
DIR_CENTER = 0x00 | |
DIR_U = 0x01 | |
DIR_R = 0x02 | |
DIR_D = 0x04 | |
DIR_L = 0x08 | |
DIR_U_R = DIR_U + DIR_R | |
DIR_D_R = DIR_D + DIR_R | |
DIR_U_L = DIR_U + DIR_L | |
DIR_D_L = DIR_D + DIR_L | |
BTN_NONE = 0x0000000000000000 | |
BTN_Y = 0x0000000000000001 | |
BTN_B = 0x0000000000000002 | |
BTN_A = 0x0000000000000004 | |
BTN_X = 0x0000000000000008 | |
BTN_L = 0x0000000000000010 | |
BTN_R = 0x0000000000000020 | |
BTN_ZL = 0x0000000000000040 | |
BTN_ZR = 0x0000000000000080 | |
BTN_MINUS = 0x0000000000000100 | |
BTN_PLUS = 0x0000000000000200 | |
BTN_LCLICK = 0x0000000000000400 | |
BTN_RCLICK = 0x0000000000000800 | |
BTN_HOME = 0x0000000000001000 | |
BTN_CAPTURE = 0x0000000000002000 | |
DPAD_CENTER = 0x0000000000000000 | |
DPAD_U = 0x0000000000010000 | |
DPAD_R = 0x0000000000020000 | |
DPAD_D = 0x0000000000040000 | |
DPAD_L = 0x0000000000080000 | |
DPAD_U_R = DPAD_U + DPAD_R | |
DPAD_D_R = DPAD_D + DPAD_R | |
DPAD_U_L = DPAD_U + DPAD_L | |
DPAD_D_L = DPAD_D + DPAD_L | |
LSTICK_CENTER = 0x0000000000000000 | |
LSTICK_R = 0x00000000FF000000 # 0 (000) | |
LSTICK_U_R = 0x0000002DFF000000 # 45 (02D) | |
LSTICK_U = 0x0000005AFF000000 # 90 (05A) | |
LSTICK_U_L = 0x00000087FF000000 # 135 (087) | |
LSTICK_L = 0x000000B4FF000000 # 180 (0B4) | |
LSTICK_D_L = 0x000000E1FF000000 # 225 (0E1) | |
LSTICK_D = 0x0000010EFF000000 # 270 (10E) | |
LSTICK_D_R = 0x0000013BFF000000 # 315 (13B) | |
RSTICK_CENTER = 0x0000000000000000 | |
RSTICK_R = 0x000FF00000000000 # 0 (000) | |
RSTICK_U_R = 0x02DFF00000000000 # 45 (02D) | |
RSTICK_U = 0x05AFF00000000000 # 90 (05A) | |
RSTICK_U_L = 0x087FF00000000000 # 135 (087) | |
RSTICK_L = 0x0B4FF00000000000 # 180 (0B4) | |
RSTICK_D_L = 0x0E1FF00000000000 # 225 (0E1) | |
RSTICK_D = 0x10EFF00000000000 # 270 (10E) | |
RSTICK_D_R = 0x13BFF00000000000 # 315 (13B) | |
NO_INPUT = BTN_NONE + DPAD_CENTER + LSTICK_CENTER + RSTICK_CENTER | |
# Commands to send to MCU | |
COMMAND_NOP = 0x00 | |
COMMAND_SYNC_1 = 0x33 | |
COMMAND_SYNC_2 = 0xCC | |
COMMAND_SYNC_START = 0xFF | |
# Responses from MCU | |
RESP_USB_ACK = 0x90 | |
RESP_UPDATE_ACK = 0x91 | |
RESP_UPDATE_NACK = 0x92 | |
RESP_SYNC_START = 0xFF | |
RESP_SYNC_1 = 0xCC | |
RESP_SYNC_OK = 0x33 | |
# Compute x and y based on angle and intensity | |
def angle(angle, intensity): | |
# y is negative because on the Y input, UP = 0 and DOWN = 255 | |
x = int((math.cos(math.radians(angle)) * 0x7F) * intensity / 0xFF) + 0x80 | |
y = -int((math.sin(math.radians(angle)) * 0x7F) * intensity / 0xFF) + 0x80 | |
return x, y | |
def lstick_angle(angle, intensity): | |
return (intensity + (angle << 8)) << 24 | |
def rstick_angle(angle, intensity): | |
return (intensity + (angle << 8)) << 44 | |
# Precision wait | |
def p_wait(waitTime): | |
t0 = time.perf_counter() | |
t1 = t0 | |
while (t1 - t0 < waitTime): | |
t1 = time.perf_counter() | |
# Wait for data to be available on the serial port | |
def wait_for_data(timeout = 1.0, sleepTime = 0.1): | |
t0 = time.perf_counter() | |
t1 = t0 | |
inWaiting = ser.in_waiting | |
while ((t1 - t0 < sleepTime) or (inWaiting == 0)): | |
time.sleep(sleepTime) | |
inWaiting = ser.in_waiting | |
t1 = time.perf_counter() | |
# Read X bytes from the serial port (returns list) | |
def read_bytes(size): | |
bytes_in = ser.read(size) | |
return list(bytes_in) | |
# Read 1 byte from the serial port (returns int) | |
def read_byte(): | |
bytes_in = read_bytes(1) | |
if len(bytes_in) != 0: | |
byte_in = bytes_in[0] | |
else: | |
byte_in = 0 | |
return byte_in | |
# Discard all incoming bytes and read the last (latest) (returns int) | |
def read_byte_latest(): | |
inWaiting = ser.in_waiting | |
if inWaiting == 0: | |
inWaiting = 1 | |
bytes_in = read_bytes(inWaiting) | |
if len(bytes_in) != 0: | |
byte_in = bytes_in[0] | |
else: | |
byte_in = 0 | |
return byte_in | |
# Write bytes to the serial port | |
def write_bytes(bytes_out): | |
ser.write(bytearray(bytes_out)) | |
return | |
# Write byte to the serial port | |
def write_byte(byte_out): | |
write_bytes([byte_out]) | |
return | |
# Compute CRC8 | |
# https://www.microchip.com/webdoc/AVRLibcReferenceManual/group__util__crc_1gab27eaaef6d7fd096bd7d57bf3f9ba083.html | |
def crc8_ccitt(old_crc, new_data): | |
data = old_crc ^ new_data | |
for i in range(8): | |
if (data & 0x80) != 0: | |
data = data << 1 | |
data = data ^ 0x07 | |
else: | |
data = data << 1 | |
data = data & 0xff | |
return data | |
# Send a raw packet and wait for a response (CRC will be added automatically) | |
def send_packet(packet=[0x00,0x00,0x08,0x80,0x80,0x80,0x80,0x00], debug=False): | |
if not debug: | |
bytes_out = [] | |
bytes_out.extend(packet) | |
# Compute CRC | |
crc = 0 | |
for d in packet: | |
crc = crc8_ccitt(crc, d) | |
bytes_out.append(crc) | |
write_bytes(bytes_out) | |
# print(bytes_out) | |
# Wait for USB ACK or UPDATE NACK | |
byte_in = read_byte() | |
commandSuccess = (byte_in == RESP_USB_ACK) | |
else: | |
commandSuccess = True | |
return commandSuccess | |
# Convert DPAD value to actual DPAD value used by Switch | |
def decrypt_dpad(dpad): | |
if dpad == DIR_U: | |
dpadDecrypt = A_DPAD_U | |
elif dpad == DIR_R: | |
dpadDecrypt = A_DPAD_R | |
elif dpad == DIR_D: | |
dpadDecrypt = A_DPAD_D | |
elif dpad == DIR_L: | |
dpadDecrypt = A_DPAD_L | |
elif dpad == DIR_U_R: | |
dpadDecrypt = A_DPAD_U_R | |
elif dpad == DIR_U_L: | |
dpadDecrypt = A_DPAD_U_L | |
elif dpad == DIR_D_R: | |
dpadDecrypt = A_DPAD_D_R | |
elif dpad == DIR_D_L: | |
dpadDecrypt = A_DPAD_D_L | |
else: | |
dpadDecrypt = A_DPAD_CENTER | |
return dpadDecrypt | |
# Convert CMD to a packet | |
def cmd_to_packet(command): | |
cmdCopy = command | |
low = (cmdCopy & 0xFF) ; cmdCopy = cmdCopy >> 8 | |
high = (cmdCopy & 0xFF) ; cmdCopy = cmdCopy >> 8 | |
dpad = (cmdCopy & 0xFF) ; cmdCopy = cmdCopy >> 8 | |
lstick_intensity = (cmdCopy & 0xFF) ; cmdCopy = cmdCopy >> 8 | |
lstick_angle = (cmdCopy & 0xFFF) ; cmdCopy = cmdCopy >> 12 | |
rstick_intensity = (cmdCopy & 0xFF) ; cmdCopy = cmdCopy >> 8 | |
rstick_angle = (cmdCopy & 0xFFF) | |
dpad = decrypt_dpad(dpad) | |
left_x, left_y = angle(lstick_angle, lstick_intensity) | |
right_x, right_y = angle(rstick_angle, rstick_intensity) | |
packet = [high, low, dpad, left_x, left_y, right_x, right_y, 0x00] | |
# print (hex(command), packet, lstick_angle, lstick_intensity, rstick_angle, rstick_intensity) | |
return packet | |
# Send a formatted controller command to the MCU | |
def send_cmd(command=NO_INPUT): | |
commandSuccess = send_packet(cmd_to_packet(command)) | |
return commandSuccess | |
#Test all buttons except for home and capture | |
def testbench_btn(): | |
send_cmd(BTN_A) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(BTN_B) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(BTN_X) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(BTN_Y) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(BTN_PLUS) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(BTN_MINUS) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(BTN_LCLICK) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(BTN_RCLICK) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
# Test DPAD U / R / D / L | |
def testbench_dpad(): | |
send_cmd(DPAD_U) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(DPAD_R) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(DPAD_D) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(DPAD_L) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
# Test DPAD Diagonals - Does not register on switch due to dpad buttons | |
def testbench_dpad_diag(): | |
send_cmd(DPAD_U_R) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(DPAD_D_R) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(DPAD_D_L) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(DPAD_U_L) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
# Test Left Analog Stick | |
def testbench_lstick(): | |
#Test U/R/D/L | |
send_cmd(BTN_LCLICK) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(LSTICK_U) ; p_wait(0.5) | |
send_cmd(LSTICK_R) ; p_wait(0.5) | |
send_cmd(LSTICK_D) ; p_wait(0.5) | |
send_cmd(LSTICK_L) ; p_wait(0.5) | |
send_cmd(LSTICK_U) ; p_wait(0.5) | |
send_cmd(LSTICK_CENTER) ; p_wait(0.5) | |
# 360 Circle @ Full Intensity | |
for i in range(0,721): | |
cmd = lstick_angle(i + 90, 0xFF) | |
send_cmd(cmd) | |
p_wait(0.001) | |
send_cmd(LSTICK_CENTER) ; p_wait(0.5) | |
# 360 Circle @ Partial Intensity | |
for i in range(0,721): | |
cmd = lstick_angle(i + 90, 0x80) | |
send_cmd(cmd) | |
p_wait(0.001) | |
send_cmd(LSTICK_CENTER) ; p_wait(0.5) | |
# Test Right Analog Stick | |
def testbench_rstick(): | |
#Test U/R/D/L | |
send_cmd(BTN_RCLICK) ; p_wait(0.5) ; send_cmd() ; p_wait(0.001) | |
send_cmd(RSTICK_U) ; p_wait(0.5) | |
send_cmd(RSTICK_R) ; p_wait(0.5) | |
send_cmd(RSTICK_D) ; p_wait(0.5) | |
send_cmd(RSTICK_L) ; p_wait(0.5) | |
send_cmd(RSTICK_U) ; p_wait(0.5) | |
send_cmd(RSTICK_CENTER) ; p_wait(0.5) | |
# 360 Circle @ Full Intensity | |
for i in range(0,721): | |
cmd = rstick_angle(i + 90, 0xFF) | |
send_cmd(cmd) | |
p_wait(0.001) | |
send_cmd(RSTICK_CENTER) ; p_wait(0.5) | |
# 360 Circle @ Partial Intensity | |
for i in range(0,721): | |
cmd = rstick_angle(i + 90, 0x80) | |
send_cmd(cmd) | |
p_wait(0.001) | |
send_cmd(RSTICK_CENTER) ; p_wait(0.5) | |
# Test Packet Speed | |
def testbench_packet_speed(count=100, debug=False): | |
sum = 0 | |
min = 999 | |
max = 0 | |
avg = 0 | |
err = 0 | |
for i in range(0, count + 1): | |
# Send packet and check time | |
t0 = time.perf_counter() | |
status = send_packet() | |
t1 = time.perf_counter() | |
# Count errors | |
if not status: | |
err += 1 | |
print('Packet Error!') | |
# Compute times | |
delta = t1 - t0 | |
if delta < min: | |
min = delta | |
if delta > max: | |
max = delta | |
sum = sum + (t1 - t0) | |
avg = sum / i | |
print('Min =', '{:.3f}'.format(min), 'Max =', '{:.3}'.format(max), 'Avg =', '{:.3f}'.format(avg), 'Errors =', err) | |
def testbench(): | |
testbench_btn() | |
testbench_dpad() | |
testbench_lstick() | |
testbench_rstick() | |
testbench_packet_speed() | |
return | |
# Force MCU to sync | |
def force_sync(): | |
# Send 9x 0xFF's to fully flush out buffer on device | |
# Device will send back 0xFF (RESP_SYNC_START) when it is ready to sync | |
write_bytes([0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF]) | |
# Wait for serial data and read the last byte sent | |
wait_for_data() | |
byte_in = read_byte_latest() | |
# Begin sync... | |
inSync = False | |
if byte_in == RESP_SYNC_START: | |
write_byte(COMMAND_SYNC_1) | |
byte_in = read_byte() | |
if byte_in == RESP_SYNC_1: | |
write_byte(COMMAND_SYNC_2) | |
byte_in = read_byte() | |
if byte_in == RESP_SYNC_OK: | |
inSync = True | |
return inSync | |
# Start MCU syncing process | |
def sync(): | |
inSync = False | |
# Try sending a packet | |
inSync = send_packet() | |
if not inSync: | |
# Not in sync: force resync and send a packet | |
inSync = force_sync() | |
if inSync: | |
inSync = send_packet() | |
return inSync | |
# ------------------------------------------------------------------------- | |
print("Verbinde ...") | |
ser = serial.Serial(port=args.port, baudrate=19200,timeout=1) | |
# ser = serial.Serial(port=args.port, baudrate=31250,timeout=1) | |
# ser = serial.Serial(port=args.port, baudrate=40000,timeout=1) | |
# ser = serial.Serial(port=args.port, baudrate=62500,timeout=1) | |
def s(data, t=0.1): | |
send_cmd(data) | |
p_wait(t) | |
#send_cmd() | |
#p_wait(0.04) | |
# w = p_wait | |
def w(t): | |
p_wait(t) | |
def p(data, t=.2): | |
s(data) | |
send_cmd() | |
p_wait(t) | |
print("Try to Sync ...") | |
# Attempt to sync with the MCU | |
while(not sync()): | |
p_wait(5) | |
print("Synced!") | |
#for i in range(0,721): | |
# cmd = lstick_angle(i + 45, 0xFF) | |
# send_cmd(cmd) | |
# p_wait(0.001) | |
#s(BTN_L + BTN_R) | |
#s(0) | |
#exit() | |
# | |
# s(BTN_A) | |
#s(0) | |
#exit() | |
from calendar import monthrange | |
global year, month, day, date_changer_counter | |
year = 2020 | |
month = 1 | |
day = 1 | |
date_changer_counter = -1 | |
def change_date(): | |
global year, month, day, date_changer_counter | |
s(BTN_HOME) | |
s(0) | |
p_wait(0.45) | |
s(lstick_angle(270, 0xFF)) | |
p_wait(0.15) | |
s(lstick_angle(0, 0xFF)) | |
p_wait(0.38) | |
s(BTN_A) | |
p_wait(0.7) | |
s(lstick_angle(270, 0xFF)) | |
p_wait(1.40) | |
s(BTN_A) | |
p_wait(0.15) | |
s(lstick_angle(270, 0xFF)) | |
p_wait(0.45) | |
s(BTN_A) | |
s(lstick_angle(270, 0xFF)) | |
p_wait(0.45) | |
s(BTN_A) | |
p_wait(0.60) | |
if(date_changer_counter == -1): | |
# first run, lets get the day, month, year data | |
p_wait(1.5) | |
day = int(get_text(437, 499, 188, 261).strip().lower().replace("o", "0").replace("l", "1").replace("i", "1").replace("b", "8")) | |
month = int(get_text(437, 499, 320, 388).strip().lower().replace("o", "0").replace("l", "1").replace("i", "1").replace("b", "8")) | |
year = int(get_text(437, 499, 462, 581).strip().lower().replace("o", "0").replace("l", "1").replace("i", "1").replace("b", "8")) | |
date_changer_counter = monthrange(year, month)[1] - day | |
print(f"{day}.{month}.{year} - {date_changer_counter}") | |
s(lstick_angle(90, 0xFF)) | |
s(0) | |
if(date_changer_counter == 0): | |
s(LSTICK_R) | |
s(0) | |
s(LSTICK_U) | |
s(0) | |
month += 1 | |
if(month == 13): | |
month = 1 | |
year += 1 | |
s(LSTICK_R) | |
s(0) | |
s(LSTICK_U) | |
s(0) | |
date_changer_counter = monthrange(year, month)[1] | |
s(lstick_angle(0, 0xFF)) | |
p_wait(0.45) | |
s(BTN_A) | |
p_wait(0.1) | |
s(BTN_HOME) | |
s(0) | |
p_wait(0.80) | |
s(BTN_HOME) | |
s(0) | |
p_wait(.75) | |
date_changer_counter -= 1 | |
def watt_farm(): | |
while True: | |
change_date() | |
s(BTN_A) | |
s(0) | |
for i in range(0, 25): | |
s(BTN_B) | |
send_cmd(0) | |
p_wait(0.05) | |
s(0) | |
p_wait(0.1) | |
def pkm_fangen(): | |
#s(BTN_PLUS) | |
s(BTN_B) | |
s(0) | |
s(BTN_B) | |
s(0) | |
s(BTN_B) | |
s(0) | |
w(1.0) | |
while True: | |
s(LSTICK_D_R) | |
w(1.5) | |
s(BTN_LCLICK) | |
s(0) | |
for i in range(0, 6): | |
if(i%5 == 0): | |
s(BTN_LCLICK) | |
s(0) | |
w(0.3) | |
# for i in range(0,int(721/2)): | |
# cmd = lstick_angle(i*2 + 90, 0xFF) | |
# send_cmd(cmd) | |
# w(0.0005) | |
s(LSTICK_U_L) | |
w(1.3) | |
s(LSTICK_D_R) | |
w(1.3) | |
for i in range(5): | |
s(BTN_B, 0.1) | |
send_cmd() | |
w(0.1) | |
s(LSTICK_U) | |
s(LSTICK_CENTER) | |
w(0.1) | |
s(BTN_X) | |
w(0.5) | |
s(BTN_A) | |
for i in range(240): | |
s(BTN_B, 0.05) | |
send_cmd() | |
s(BTN_A, 0.05) | |
send_cmd() | |
for i in range(20): | |
s(BTN_B, 0.1) | |
send_cmd() | |
send_cmd(RSTICK_CENTER) ; w(0.5) | |
global lotto_got | |
lotto_got = { | |
"Milch": 0, | |
"ap-plus": 0, | |
"ap-top": 0, | |
"candy": 0, | |
"mball": 0 | |
} | |
def farm_mballs(): | |
global lotto_got | |
count = 0 | |
start = time.time() | |
while(True): | |
change_date() | |
p(BTN_A, 0.4) | |
p(BTN_A, 0.4) | |
p(LSTICK_D) | |
p(BTN_A, 0.4) | |
for i in range(32): | |
s(BTN_A, 0.05) | |
s(0, 0.05) | |
for i in range(76): | |
s(BTN_B, 0.05) | |
s(0, 0.05) | |
get = get_text(584, 646, 229, 1044).strip().lower() | |
if("milch" in get): | |
lotto_got["Milch"] += 1 | |
elif("plus" in get): | |
lotto_got["ap-plus"] += 1 | |
elif("top" in get): | |
lotto_got["ap-top"] += 1 | |
elif("sonder" in get): | |
lotto_got["candy"] += 1 | |
elif("meister" in get): | |
lotto_got["mball"] += 1 | |
for i in range(30): | |
s(BTN_B, 0.05) | |
s(0, 0.05) | |
t = time.time() - start | |
count += 1 | |
per_hour = round((count / t) * 3600, 4) | |
txt = f"Alles: {count}: ({per_hour}/h)" | |
for item in lotto_got: | |
i_per_hour = round((lotto_got[item] / t) * 3600, 4) | |
txt += f"\n{item}: {lotto_got[item]} ({i_per_hour}/h)" | |
draw_text(txt) | |
def suprise_trade(): | |
while(True): | |
not_online_counter = 0 | |
#frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
print(id(frame)) | |
search_status = get_text(677, 699, 107, 227).lower() | |
print(id(frame)) | |
if(("suche" not in search_status or not search_status) and not "tau" in search_status): | |
print("ETWAS GEFUNDEN !") | |
s(BTN_Y) | |
s(0) | |
p_wait(2) | |
print("Y PRESSED !!!!") | |
just_started = False | |
print("NEUER FRAME !!!") | |
online_status = get_text(0, 50, 1280-225, 1280-25) | |
print(online_status) | |
while(("online" not in online_status.lower() and "0nline" not in online_status.lower()) or "lokal" in online_status.lower()): | |
print("NICHT ONLINE ... NUN GEHEN WIR ONLINE!") | |
s(BTN_PLUS) | |
s(0) | |
s(BTN_A) | |
s(0) | |
p_wait(20) | |
s(BTN_A) | |
s(0) | |
p_wait(10) | |
online_status = get_text(0, 50, 1280-225, 1280-25) | |
not_online_counter += 1 | |
if(not_online_counter >= 2): | |
for i in range(100): | |
s(BTN_B) | |
s(0) | |
break | |
if(not_online_counter): | |
continue | |
print("WAEHLE ZAUBERTAUSCH ....") | |
s(LSTICK_D) | |
s(0) | |
p_wait(1) | |
s(BTN_A) | |
s(0, 4.0) | |
print("WEAHLE PKM") | |
# for i in range(pos[2]): | |
# s(BTN_R, 0.06) | |
# s(0, 0.4) | |
# p_wait(0.2) | |
for i in range(pos[0]): | |
s(LSTICK_R, 0.05) | |
s(0, 0.4) | |
p_wait(0.2) | |
for i in range(pos[1]): | |
s(LSTICK_D, 0.05) | |
s(0, 0.4) | |
p_wait(0.01) | |
s(BTN_A) | |
s(0) | |
p_wait(1.5) | |
s(LSTICK_D) | |
s(0) | |
s(BTN_A) | |
s(0) | |
p_wait(5) | |
print("CHECK ID ....") | |
while(True): | |
trainer_id = get_text(228, 258, 288, 412).strip().lower().replace("o", "0")[1:].strip() | |
print(trainer_id) | |
if(trainer_id in all_trainer): | |
print(f"DOUBLE TRAINER ID ! {pos}") | |
s(BTN_B) | |
s(0) | |
w(2.5) | |
for i in range(75): | |
s(BTN_A) | |
s(0) | |
# inc_pos() | |
p_wait(5) | |
break | |
else: | |
print(f"UNIQUE TRAINER ID ! {pos}") | |
all_trainer.append(trainer_id) | |
trainer_id = "0" | |
s(LSTICK_D) | |
s(0) | |
inc_pos() | |
w(1.8) | |
if(pos[0] == 0 and pos[1] == 0): | |
s(BTN_B) | |
s(0, 2.5) | |
s(BTN_R) | |
s(0, 2.0) | |
s(BTN_A) | |
s(0, 2.0) | |
s(LSTICK_D) | |
s(0, 0.3) | |
s(BTN_A) | |
s(0, 4.0) | |
with open("st_data.json", "w") as f: | |
json.dump({"pos": pos, "trainer": all_trainer}, f) | |
else: | |
print("FOUND?!") | |
while "tau" not in search_status: | |
search_status = get_text(670, 700, 95, 227).lower() | |
print(search_status) | |
w(0.5) | |
print("GEFUNDEN !!!") | |
s(BTN_Y) | |
s(0) | |
for i in range(100): | |
s(BTN_A) | |
s(0) | |
w(0.1) | |
print("import ...") | |
from tesserocr import PyTessBaseAPI, RIL | |
from PIL import Image | |
import cv2 | |
import numpy as np | |
import _thread as thread | |
import json | |
global frame, lock, text_img | |
text_img = None | |
def draw_text(txt): | |
global text_img | |
font_scale = 1.2 | |
font = cv2.FONT_HERSHEY_COMPLEX_SMALL | |
# set the rectangle background to white | |
rectangle_bgr = (255, 255, 255) | |
# make a black image | |
txt = txt.split("\n") | |
# get the width and height of the text box | |
max_width = 0 | |
max_height = 0 | |
all_height = 0 | |
for item in txt: | |
(text_width, text_height) = cv2.getTextSize(item, font, fontScale=font_scale, thickness=1)[0] | |
if(text_width > max_width): | |
max_width = text_width | |
if(text_height > max_height): | |
max_height = text_height | |
all_height += text_height + 10 | |
# set the text start position | |
# text_offset_x = 10 | |
# text_offset_y = text_img.shape[0] - 25 | |
# make the coords of the box with a small padding of two pixels | |
# box_coords = ((text_offset_x, text_offset_y), (text_offset_x + text_width + 2, text_offset_y - text_height - 2)) | |
# cv2.rectangle(text_img, box_coords[0], box_coords[1], rectangle_bgr, cv2.FILLED) | |
text_img = np.zeros((all_height+30,text_width+80, 3)) | |
for idx, item in enumerate(txt): | |
cv2.putText(text_img, item, (5, (max_height*idx+idx*10)+30), font, fontScale=font_scale, color=(255, 255, 255), thickness=1) | |
draw_text("TEST1\nTEST2\nTEST3") | |
frame = {} | |
s(BTN_B) | |
s(0) | |
s(BTN_B) | |
s(0) | |
s(BTN_A) | |
s(0) | |
p_wait(1.0) | |
def get_text(y1, y2, x1, x2): | |
global frame, lock | |
with lock: | |
cpy = np.array(frame, copy=True) | |
img = Image.fromarray(cpy[y1:y2,x1:x2]) | |
api.SetImage(img) | |
best_text = api.GetUTF8Text().strip() | |
# api. | |
# boxes = api.GetComponentImages(RIL.TEXTLINE, True) | |
# best_text = "" | |
# for i, (im, box, _, _) in enumerate(boxes): | |
# api.SetRectangle(box['x'], box['y'], box['w'], box['h']) | |
# ocrResult = api.GetUTF8Text() | |
# conf = api.MeanTextConf() | |
# if(ocrResult and len(ocrResult.strip()) > len(best_text)): | |
# best_text = ocrResult.strip() | |
return best_text | |
pos = [0, 0, 0] | |
def inc_pos(): | |
if(pos[0] == 5): | |
pos[0] = 0 | |
if(pos[1] == 4): | |
pos[1] = 0 | |
pos[2] += 1 | |
else: | |
pos[1] += 1 | |
else: | |
pos[0] += 1 | |
all_trainer = [] | |
searching = False | |
just_started = False | |
trainer_id = "0" | |
def refresh_frame(): | |
global frame, lock, text_img | |
cap = cv2.VideoCapture(0) | |
cap.set(3,1280) | |
cap.set(4,1024) | |
p_wait(3) | |
while True: | |
with lock: | |
ret, frame = cap.read() | |
# print(f"IN-CAP: {id(frame)}") | |
to_draw = np.array(frame, copy=True) | |
to_draw[0:text_img.shape[0], 0:text_img.shape[1]] = text_img | |
cv2.imshow('frame',to_draw) | |
if not ret: | |
print("Can't receive frame (stream end?). Exiting ...") | |
exit() | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
if cv2.waitKey(50) == ord('q') & 0xff: | |
exit() | |
p_wait(0.01) | |
with open("st_data.json", "r") as f: | |
d = json.load(f) | |
pos = d.get("pos", [0,0,0]) | |
pos[2] = 0 | |
all_trainer = d.get("trainer", []) | |
with PyTessBaseAPI(lang='deu') as api: | |
lock = thread.allocate_lock() | |
thread.start_new_thread ( refresh_frame, () ) | |
for i in range(50): | |
s(BTN_B) | |
s(0) | |
start = time.time() | |
count = 0 | |
farm_mballs() | |
if not send_cmd(): | |
print('Packet Error!') | |
# testbench() | |
# testbench_packet_speed(1000) | |
ser.close |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment