Skip to content

Instantly share code, notes, and snippets.

@Forgo7ten
Created August 14, 2025 09:12
Show Gist options
  • Select an option

  • Save Forgo7ten/5a84efd2b68c6385d8eeecf4dfae55b4 to your computer and use it in GitHub Desktop.

Select an option

Save Forgo7ten/5a84efd2b68c6385d8eeecf4dfae55b4 to your computer and use it in GitHub Desktop.
简单实现adbd协议的server
import socket
import struct
import threading
import time
class Mock:
"""Mock类,包含所有模拟设备输出的函数"""
def __init__(self):
# 设备属性
self.device_props = {
'ro.product.model': 'Python_Device',
'ro.product.device': 'python_device',
'ro.build.version.software': 'null',
'ro.build.type': 'user',
# 用户可以在这里添加更多属性
}
# 可扩展的命令处理器
self.service_handlers = {
'getprop': self.handle_getprop,
'ls': self.handle_ls,
'pwd': self.handle_pwd,
'whoami': self.handle_whoami,
'echo': self.handle_echo,
'cat': self.handle_cat,
'ps': self.handle_ps,
'df': self.handle_df,
# 用户可以在这里添加更多命令处理器
}
def handle_getprop(self, args):
"""处理getprop命令"""
if not args:
# 返回所有属性
output = ""
for key, value in self.device_props.items():
output += f"[{key}]: [{value}]\n"
return output
else:
# 返回特定属性
prop_name = args.strip()
if prop_name in self.device_props:
return self.device_props[prop_name] + "\n"
else:
return ""
def handle_ls(self, args):
"""处理ls命令"""
path = args.strip() if args else "/"
# 模拟文件系统
filesystem = {
"/": ["sdcard", "data", "system", "vendor", "dev", "proc"],
"/sdcard": ["Download", "DCIM", "Pictures", "Documents"],
"/data": ["app", "data", "local"],
"/system": ["bin", "lib", "etc", "framework"],
}
if path in filesystem:
files = filesystem[path]
output = " ".join(f"{file}" for file in files) + "\n"
print(f"LS输出: {repr(output)}") # 调试输出
return output
else:
return f"ls: {path}: No such file or directory\n"
def handle_pwd(self, args):
"""处理pwd命令"""
return "/\n"
def handle_whoami(self, args):
"""处理whoami命令"""
return "shell\n"
def handle_echo(self, args):
"""处理echo命令"""
return args + "\n" if args else "\n"
def handle_cat(self, args):
"""处理cat命令"""
filename = args.strip()
# 模拟一些文件内容
files = {
"/proc/version": "Linux version 4.14.186 (Python ADB Server)\n",
"/system/build.prop": "ro.build.version.release=11\nro.product.model=Python_Device\n",
}
if filename in files:
return files[filename]
else:
return f"cat: {filename}: No such file or directory\n"
def handle_ps(self, args):
"""处理ps命令"""
return "USER PID PPID VSIZE RSS WCHAN PC NAME\n" \
"root 1 0 1024 512 0 00000000 init\n" \
"shell 123 1 2048 1024 0 00000000 sh\n"
def handle_df(self, args):
"""处理df命令"""
return "Filesystem 1K-blocks Used Available Use% Mounted on\n" \
"/dev/root 10485760 5242880 5242880 50% /\n" \
"/dev/sdcard 20971520 2097152 18874368 10% /sdcard\n"
class ADBServer:
# ADB协议常量
ADB_VERSION = 0x01000000 # ADB协议版本
ADB_MAX_DATA = 4096 # 最大数据包大小
# ADB命令常量
CMD_CNXN = 0x4e584e43 # 'CNXN' - 连接命令
CMD_AUTH = 0x48545541 # 'AUTH' - 认证命令
CMD_OPEN = 0x4e45504f # 'OPEN' - 打开命令
CMD_OKAY = 0x59414b4f # 'OKAY' - 确认命令
CMD_CLSE = 0x45534c43 # 'CLSE' - 关闭命令
CMD_WRTE = 0x45545257 # 'WRTE' - 写入命令
def __init__(self, host='0.0.0.0', port=5555):
self.host = host
self.port = port
self.server_socket = None
self.running = False
self.device_name = "Python_ADB_Device"
self.device_model = "Python_Custom_Device"
self.device_device = "python_device"
self.connections = {} # 存储连接状态
# 创建Mock实例
self.mock = Mock()
def calculate_checksum(self, data):
"""计算ADB数据校验和"""
checksum = 0
for byte in data:
checksum += byte
return checksum & 0xffffffff
def create_message(self, command, arg0, arg1, data=b''):
"""创建ADB消息包"""
magic = command ^ 0xffffffff
if data:
checksum = self.calculate_checksum(data)
else:
checksum = 0
# ADB消息格式:命令(4) + arg0(4) + arg1(4) + 数据长度(4) + 校验和(4) + 魔数(4)
header = struct.pack('<IIIIII',
command, arg0, arg1,
len(data), checksum, magic)
return header + data
def send_response(self, client_socket, local_id, remote_id, output):
"""发送响应数据"""
try:
# 确保output是字符串
if output is None:
output = ""
elif not isinstance(output, str):
output = str(output)
# 发送数据(如果有)
if output:
write_msg = self.create_message(
self.CMD_WRTE,
local_id,
remote_id,
output.encode('utf-8')
)
client_socket.send(write_msg)
print(f"发送数据: {output.strip()[:50]}{'...' if len(output) > 50 else ''}")
# 等待客户端确认
time.sleep(0.1)
except Exception as e:
print(f"发送响应时出错: {e}")
def close_connection(self, client_socket, local_id, remote_id):
"""关闭连接"""
try:
# 发送CLSE关闭连接
close_msg = self.create_message(self.CMD_CLSE, local_id, remote_id)
client_socket.send(close_msg)
print("发送CLSE关闭连接")
except Exception as e:
print(f"关闭连接时出错: {e}")
def send_response_and_close(self, client_socket, local_id, remote_id, output):
"""发送响应数据并关闭连接(保持向后兼容)"""
self.send_response(client_socket, local_id, remote_id, output)
self.close_connection(client_socket, local_id, remote_id)
def handle_service(self, client_socket, local_id, remote_id, service):
"""处理服务请求"""
output = None
try:
if service == 'shell:':
# 交互式shell - 特殊处理
print("启动交互式shell会话(未实现,退出响应)")
self.send_response_and_close(client_socket, local_id, remote_id, "exit\n")
return
elif service.startswith('shell:') or service.startswith('exec:'):
# 处理shell命令
cmd_line = service[service.find(':')+1:].strip() # 去掉 前缀
if not cmd_line:
output = ""
else:
# 解析命令和参数
parts = cmd_line.split(' ', 1)
cmd = parts[0]
args = parts[1] if len(parts) > 1 else ""
# 查找对应的处理器
if cmd in self.mock.service_handlers:
output = self.mock.service_handlers[cmd](args)
else:
# 未知命令
output = f"{cmd}: command not found\n"
else:
# 其他未知服务
output = f"Unknown service: {service}\n"
except Exception as e:
print(f"处理服务时出错: {e}")
import traceback
traceback.print_exc()
output = f"Error: {str(e)}\n"
finally:
# 统一处理输出和关闭连接
if output is not None: # 只有非交互式命令才发送响应并关闭
self.send_response_and_close(client_socket, local_id, remote_id, output)
def handle_client(self, client_socket, client_address):
"""处理客户端连接"""
print(f"客户端连接: {client_address}")
local_id_counter = 1
open_channels = {} # 记录打开的通道
try:
while True:
# 接收消息头
header_data = client_socket.recv(24)
if not header_data or len(header_data) < 24:
break
# 解析消息头
command, arg0, arg1, data_len, checksum, magic = struct.unpack('<IIIIII', header_data)
# 接收数据部分
payload = b''
if data_len > 0:
received = 0
while received < data_len:
chunk = client_socket.recv(min(data_len - received, 4096))
if not chunk:
break
payload += chunk
received += len(chunk)
# 处理不同的命令
if command == self.CMD_CNXN:
# 处理连接请求
print("收到CNXN连接请求")
self.handle_connect(client_socket, arg0, arg1, payload)
elif command == self.CMD_OPEN:
# 处理打开请求
service = payload.decode('utf-8', errors='ignore').strip('\x00')
print(f"收到OPEN请求: {service}")
# 分配本地ID
local_id = local_id_counter
local_id_counter += 1
remote_id = arg0
# 记录通道信息
open_channels[local_id] = {
'remote_id': remote_id,
'service': service
}
# 发送OKAY响应
okay_msg = self.create_message(self.CMD_OKAY, local_id, remote_id)
client_socket.send(okay_msg)
print(f"为服务 '{service}' 发送OKAY响应 (local_id: {local_id}, remote_id: {remote_id})")
# 处理不同的服务
self.handle_service(client_socket, local_id, remote_id, service)
elif command == self.CMD_OKAY:
# 处理确认
print(f"收到OKAY确认 (local: {arg0}, remote: {arg1})")
elif command == self.CMD_CLSE:
# 处理关闭
print(f"收到CLSE关闭请求 (local: {arg0}, remote: {arg1})")
# 回复CLSE
close_msg = self.create_message(self.CMD_CLSE, arg1, arg0)
client_socket.send(close_msg)
# 清理通道
if arg1 in open_channels:
del open_channels[arg1]
elif command == self.CMD_WRTE:
# 处理写入
print(f"收到WRTE写入请求: {payload}")
# 回复OKAY
okay_msg = self.create_message(self.CMD_OKAY, arg1, arg0)
client_socket.send(okay_msg)
except Exception as e:
print(f"处理客户端时出错: {e}")
import traceback
traceback.print_exc()
finally:
client_socket.close()
print(f"客户端断开连接: {client_address}")
def handle_connect(self, client_socket, arg0, arg1, payload):
"""处理连接请求"""
# 构建设备信息字符串
device_info = f"device::{self.device_name};model:{self.device_model};device:{self.device_device};\x00"
# 发送CNXN响应
response = self.create_message(
self.CMD_CNXN,
self.ADB_VERSION,
self.ADB_MAX_DATA,
device_info.encode('utf-8')
)
client_socket.send(response)
print("发送CNXN响应")
def start(self):
"""启动ADB服务器"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"ADB服务器启动在 {self.host}:{self.port}")
print(f"可以使用 'adb connect {self.host}:{self.port}' 连接")
while self.running:
try:
client_socket, client_address = self.server_socket.accept()
# 为每个客户端创建新线程
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, client_address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
break
except Exception as e:
print(f"接受连接时出错: {e}")
except Exception as e:
print(f"启动服务器失败: {e}")
finally:
self.stop()
def stop(self):
"""停止ADB服务器"""
self.running = False
if self.server_socket:
self.server_socket.close()
print("ADB服务器已停止")
def main():
# 创建并启动ADB服务器
server = ADBServer(host='0.0.0.0', port=7777)
try:
server.start()
except KeyboardInterrupt:
print("\n正在关闭服务器...")
server.stop()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment