Created
August 14, 2025 09:12
-
-
Save Forgo7ten/5a84efd2b68c6385d8eeecf4dfae55b4 to your computer and use it in GitHub Desktop.
简单实现adbd协议的server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import socket | |
| import struct | |
| import threading | |
| import time | |
| class Mock: | |
| """Mock类,包含所有模拟设备输出的函数""" | |
| def __init__(self): | |
| # 设备属性 | |
| self.device_props = { | |
| 'ro.product.model': 'Python_Device', | |
| 'ro.product.device': 'python_device', | |
| 'ro.build.version.software': 'null', | |
| 'ro.build.type': 'user', | |
| # 用户可以在这里添加更多属性 | |
| } | |
| # 可扩展的命令处理器 | |
| self.service_handlers = { | |
| 'getprop': self.handle_getprop, | |
| 'ls': self.handle_ls, | |
| 'pwd': self.handle_pwd, | |
| 'whoami': self.handle_whoami, | |
| 'echo': self.handle_echo, | |
| 'cat': self.handle_cat, | |
| 'ps': self.handle_ps, | |
| 'df': self.handle_df, | |
| # 用户可以在这里添加更多命令处理器 | |
| } | |
| def handle_getprop(self, args): | |
| """处理getprop命令""" | |
| if not args: | |
| # 返回所有属性 | |
| output = "" | |
| for key, value in self.device_props.items(): | |
| output += f"[{key}]: [{value}]\n" | |
| return output | |
| else: | |
| # 返回特定属性 | |
| prop_name = args.strip() | |
| if prop_name in self.device_props: | |
| return self.device_props[prop_name] + "\n" | |
| else: | |
| return "" | |
| def handle_ls(self, args): | |
| """处理ls命令""" | |
| path = args.strip() if args else "/" | |
| # 模拟文件系统 | |
| filesystem = { | |
| "/": ["sdcard", "data", "system", "vendor", "dev", "proc"], | |
| "/sdcard": ["Download", "DCIM", "Pictures", "Documents"], | |
| "/data": ["app", "data", "local"], | |
| "/system": ["bin", "lib", "etc", "framework"], | |
| } | |
| if path in filesystem: | |
| files = filesystem[path] | |
| output = " ".join(f"{file}" for file in files) + "\n" | |
| print(f"LS输出: {repr(output)}") # 调试输出 | |
| return output | |
| else: | |
| return f"ls: {path}: No such file or directory\n" | |
| def handle_pwd(self, args): | |
| """处理pwd命令""" | |
| return "/\n" | |
| def handle_whoami(self, args): | |
| """处理whoami命令""" | |
| return "shell\n" | |
| def handle_echo(self, args): | |
| """处理echo命令""" | |
| return args + "\n" if args else "\n" | |
| def handle_cat(self, args): | |
| """处理cat命令""" | |
| filename = args.strip() | |
| # 模拟一些文件内容 | |
| files = { | |
| "/proc/version": "Linux version 4.14.186 (Python ADB Server)\n", | |
| "/system/build.prop": "ro.build.version.release=11\nro.product.model=Python_Device\n", | |
| } | |
| if filename in files: | |
| return files[filename] | |
| else: | |
| return f"cat: {filename}: No such file or directory\n" | |
| def handle_ps(self, args): | |
| """处理ps命令""" | |
| return "USER PID PPID VSIZE RSS WCHAN PC NAME\n" \ | |
| "root 1 0 1024 512 0 00000000 init\n" \ | |
| "shell 123 1 2048 1024 0 00000000 sh\n" | |
| def handle_df(self, args): | |
| """处理df命令""" | |
| return "Filesystem 1K-blocks Used Available Use% Mounted on\n" \ | |
| "/dev/root 10485760 5242880 5242880 50% /\n" \ | |
| "/dev/sdcard 20971520 2097152 18874368 10% /sdcard\n" | |
| class ADBServer: | |
| # ADB协议常量 | |
| ADB_VERSION = 0x01000000 # ADB协议版本 | |
| ADB_MAX_DATA = 4096 # 最大数据包大小 | |
| # ADB命令常量 | |
| CMD_CNXN = 0x4e584e43 # 'CNXN' - 连接命令 | |
| CMD_AUTH = 0x48545541 # 'AUTH' - 认证命令 | |
| CMD_OPEN = 0x4e45504f # 'OPEN' - 打开命令 | |
| CMD_OKAY = 0x59414b4f # 'OKAY' - 确认命令 | |
| CMD_CLSE = 0x45534c43 # 'CLSE' - 关闭命令 | |
| CMD_WRTE = 0x45545257 # 'WRTE' - 写入命令 | |
| def __init__(self, host='0.0.0.0', port=5555): | |
| self.host = host | |
| self.port = port | |
| self.server_socket = None | |
| self.running = False | |
| self.device_name = "Python_ADB_Device" | |
| self.device_model = "Python_Custom_Device" | |
| self.device_device = "python_device" | |
| self.connections = {} # 存储连接状态 | |
| # 创建Mock实例 | |
| self.mock = Mock() | |
| def calculate_checksum(self, data): | |
| """计算ADB数据校验和""" | |
| checksum = 0 | |
| for byte in data: | |
| checksum += byte | |
| return checksum & 0xffffffff | |
| def create_message(self, command, arg0, arg1, data=b''): | |
| """创建ADB消息包""" | |
| magic = command ^ 0xffffffff | |
| if data: | |
| checksum = self.calculate_checksum(data) | |
| else: | |
| checksum = 0 | |
| # ADB消息格式:命令(4) + arg0(4) + arg1(4) + 数据长度(4) + 校验和(4) + 魔数(4) | |
| header = struct.pack('<IIIIII', | |
| command, arg0, arg1, | |
| len(data), checksum, magic) | |
| return header + data | |
| def send_response(self, client_socket, local_id, remote_id, output): | |
| """发送响应数据""" | |
| try: | |
| # 确保output是字符串 | |
| if output is None: | |
| output = "" | |
| elif not isinstance(output, str): | |
| output = str(output) | |
| # 发送数据(如果有) | |
| if output: | |
| write_msg = self.create_message( | |
| self.CMD_WRTE, | |
| local_id, | |
| remote_id, | |
| output.encode('utf-8') | |
| ) | |
| client_socket.send(write_msg) | |
| print(f"发送数据: {output.strip()[:50]}{'...' if len(output) > 50 else ''}") | |
| # 等待客户端确认 | |
| time.sleep(0.1) | |
| except Exception as e: | |
| print(f"发送响应时出错: {e}") | |
| def close_connection(self, client_socket, local_id, remote_id): | |
| """关闭连接""" | |
| try: | |
| # 发送CLSE关闭连接 | |
| close_msg = self.create_message(self.CMD_CLSE, local_id, remote_id) | |
| client_socket.send(close_msg) | |
| print("发送CLSE关闭连接") | |
| except Exception as e: | |
| print(f"关闭连接时出错: {e}") | |
| def send_response_and_close(self, client_socket, local_id, remote_id, output): | |
| """发送响应数据并关闭连接(保持向后兼容)""" | |
| self.send_response(client_socket, local_id, remote_id, output) | |
| self.close_connection(client_socket, local_id, remote_id) | |
| def handle_service(self, client_socket, local_id, remote_id, service): | |
| """处理服务请求""" | |
| output = None | |
| try: | |
| if service == 'shell:': | |
| # 交互式shell - 特殊处理 | |
| print("启动交互式shell会话(未实现,退出响应)") | |
| self.send_response_and_close(client_socket, local_id, remote_id, "exit\n") | |
| return | |
| elif service.startswith('shell:') or service.startswith('exec:'): | |
| # 处理shell命令 | |
| cmd_line = service[service.find(':')+1:].strip() # 去掉 前缀 | |
| if not cmd_line: | |
| output = "" | |
| else: | |
| # 解析命令和参数 | |
| parts = cmd_line.split(' ', 1) | |
| cmd = parts[0] | |
| args = parts[1] if len(parts) > 1 else "" | |
| # 查找对应的处理器 | |
| if cmd in self.mock.service_handlers: | |
| output = self.mock.service_handlers[cmd](args) | |
| else: | |
| # 未知命令 | |
| output = f"{cmd}: command not found\n" | |
| else: | |
| # 其他未知服务 | |
| output = f"Unknown service: {service}\n" | |
| except Exception as e: | |
| print(f"处理服务时出错: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| output = f"Error: {str(e)}\n" | |
| finally: | |
| # 统一处理输出和关闭连接 | |
| if output is not None: # 只有非交互式命令才发送响应并关闭 | |
| self.send_response_and_close(client_socket, local_id, remote_id, output) | |
| def handle_client(self, client_socket, client_address): | |
| """处理客户端连接""" | |
| print(f"客户端连接: {client_address}") | |
| local_id_counter = 1 | |
| open_channels = {} # 记录打开的通道 | |
| try: | |
| while True: | |
| # 接收消息头 | |
| header_data = client_socket.recv(24) | |
| if not header_data or len(header_data) < 24: | |
| break | |
| # 解析消息头 | |
| command, arg0, arg1, data_len, checksum, magic = struct.unpack('<IIIIII', header_data) | |
| # 接收数据部分 | |
| payload = b'' | |
| if data_len > 0: | |
| received = 0 | |
| while received < data_len: | |
| chunk = client_socket.recv(min(data_len - received, 4096)) | |
| if not chunk: | |
| break | |
| payload += chunk | |
| received += len(chunk) | |
| # 处理不同的命令 | |
| if command == self.CMD_CNXN: | |
| # 处理连接请求 | |
| print("收到CNXN连接请求") | |
| self.handle_connect(client_socket, arg0, arg1, payload) | |
| elif command == self.CMD_OPEN: | |
| # 处理打开请求 | |
| service = payload.decode('utf-8', errors='ignore').strip('\x00') | |
| print(f"收到OPEN请求: {service}") | |
| # 分配本地ID | |
| local_id = local_id_counter | |
| local_id_counter += 1 | |
| remote_id = arg0 | |
| # 记录通道信息 | |
| open_channels[local_id] = { | |
| 'remote_id': remote_id, | |
| 'service': service | |
| } | |
| # 发送OKAY响应 | |
| okay_msg = self.create_message(self.CMD_OKAY, local_id, remote_id) | |
| client_socket.send(okay_msg) | |
| print(f"为服务 '{service}' 发送OKAY响应 (local_id: {local_id}, remote_id: {remote_id})") | |
| # 处理不同的服务 | |
| self.handle_service(client_socket, local_id, remote_id, service) | |
| elif command == self.CMD_OKAY: | |
| # 处理确认 | |
| print(f"收到OKAY确认 (local: {arg0}, remote: {arg1})") | |
| elif command == self.CMD_CLSE: | |
| # 处理关闭 | |
| print(f"收到CLSE关闭请求 (local: {arg0}, remote: {arg1})") | |
| # 回复CLSE | |
| close_msg = self.create_message(self.CMD_CLSE, arg1, arg0) | |
| client_socket.send(close_msg) | |
| # 清理通道 | |
| if arg1 in open_channels: | |
| del open_channels[arg1] | |
| elif command == self.CMD_WRTE: | |
| # 处理写入 | |
| print(f"收到WRTE写入请求: {payload}") | |
| # 回复OKAY | |
| okay_msg = self.create_message(self.CMD_OKAY, arg1, arg0) | |
| client_socket.send(okay_msg) | |
| except Exception as e: | |
| print(f"处理客户端时出错: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| finally: | |
| client_socket.close() | |
| print(f"客户端断开连接: {client_address}") | |
| def handle_connect(self, client_socket, arg0, arg1, payload): | |
| """处理连接请求""" | |
| # 构建设备信息字符串 | |
| device_info = f"device::{self.device_name};model:{self.device_model};device:{self.device_device};\x00" | |
| # 发送CNXN响应 | |
| response = self.create_message( | |
| self.CMD_CNXN, | |
| self.ADB_VERSION, | |
| self.ADB_MAX_DATA, | |
| device_info.encode('utf-8') | |
| ) | |
| client_socket.send(response) | |
| print("发送CNXN响应") | |
| def start(self): | |
| """启动ADB服务器""" | |
| self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| try: | |
| self.server_socket.bind((self.host, self.port)) | |
| self.server_socket.listen(5) | |
| self.running = True | |
| print(f"ADB服务器启动在 {self.host}:{self.port}") | |
| print(f"可以使用 'adb connect {self.host}:{self.port}' 连接") | |
| while self.running: | |
| try: | |
| client_socket, client_address = self.server_socket.accept() | |
| # 为每个客户端创建新线程 | |
| client_thread = threading.Thread( | |
| target=self.handle_client, | |
| args=(client_socket, client_address) | |
| ) | |
| client_thread.daemon = True | |
| client_thread.start() | |
| except KeyboardInterrupt: | |
| break | |
| except Exception as e: | |
| print(f"接受连接时出错: {e}") | |
| except Exception as e: | |
| print(f"启动服务器失败: {e}") | |
| finally: | |
| self.stop() | |
| def stop(self): | |
| """停止ADB服务器""" | |
| self.running = False | |
| if self.server_socket: | |
| self.server_socket.close() | |
| print("ADB服务器已停止") | |
| def main(): | |
| # 创建并启动ADB服务器 | |
| server = ADBServer(host='0.0.0.0', port=7777) | |
| try: | |
| server.start() | |
| except KeyboardInterrupt: | |
| print("\n正在关闭服务器...") | |
| server.stop() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment