Created
November 23, 2024 04:09
-
-
Save garywill/f49056b6ee9953befac055018afaab67 to your computer and use it in GitHub Desktop.
Use MCU as simple TCP UART middle server (简单无线串口中间服务器). Tested on ESP32-C3 with MicroPython 1.24.0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # On Linux PC | |
| # socat pty,link=/tmp/ttyV0,raw tcp:192.168.X.X:2000 | |
| # picocom /tmp/ttyV0 | |
| # esptool.py --before no_reset --after no_reset --port /tmp/ttyV0 | |
| # rshell -p /tmp/ttyV0 --buffer-size 5000 | |
| showTCP=False | |
| showUART=False | |
| try: | |
| tcp_buffersize | |
| except: | |
| tcp_buffersize=6_000 | |
| try: | |
| custom_timeout | |
| except NameError: | |
| custom_timeout = 0.1 | |
| import socket | |
| import machine | |
| from machine import UART | |
| from machine import Pin | |
| import errno | |
| import time | |
| p9 = Pin(9, mode=Pin.IN) | |
| try: | |
| client_socket.close() | |
| except: | |
| pass | |
| try: | |
| server_socket.close() | |
| except: | |
| pass | |
| try: | |
| uart1.irq(None) | |
| except: | |
| pass | |
| uart1 = None | |
| server_socket = None | |
| client_socket = None | |
| def stop_tcpuart(): | |
| global uart1, server_socket, client_socket | |
| if client_socket: | |
| client_socket.close() | |
| if server_socket: | |
| server_socket.close() | |
| if uart1: | |
| uart1.irq(None) | |
| uart1.deinit() | |
| def start_tcpuart(n=1): | |
| global uart1, server_socket, client_socket | |
| stop_tcpuart() | |
| if n == 1: | |
| tcpport = 2000 | |
| txpin = 4 | |
| rxpin = 5 | |
| elif n == 2: | |
| tcpport = 2001 | |
| txpin = 7 | |
| rxpin = 6 | |
| print(f"tcpport:{tcpport} txpin:{txpin} rxpin:{rxpin}") | |
| uart1 = machine.UART(1, baudrate=115200, tx=txpin, rx=rxpin) | |
| addr = socket.getaddrinfo('0.0.0.0', tcpport)[0][-1] | |
| server_socket = socket.socket() | |
| server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| server_socket.bind(addr) | |
| server_socket.listen(5) | |
| print('Listening on', addr) | |
| server_socket.setsockopt(socket.SOL_SOCKET, 20, accept_handler) | |
| def accept_handler(server_socket): | |
| global uart1, client_socket | |
| client_socket, client_addr = server_socket.accept() | |
| print('Client connected from', client_addr) | |
| client_socket.setblocking(False) | |
| client_socket.setsockopt(socket.SOL_SOCKET, 20, tcp_handler) | |
| uart1.irq( uart_rx_cb, UART.IRQ_RX) | |
| def tcp_handler(client_socket): | |
| client_socket.settimeout(custom_timeout) | |
| data_tcprec = b'' | |
| data_tcprec = client_socket.recv(tcp_buffersize) | |
| if len(data_tcprec) > 0: | |
| if showTCP: | |
| print(data_tcprec) | |
| uart1.write(data_tcprec) | |
| def uart_rx_cb(*self): | |
| data = uart1.read() | |
| if data: | |
| if showUART: | |
| print(data) | |
| if client_socket: | |
| client_socket.send(data) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment