Skip to content

Instantly share code, notes, and snippets.

@Zhong-Zhang
Last active November 4, 2025 11:01
Show Gist options
  • Select an option

  • Save Zhong-Zhang/4c2529ec9059f54ca3df3bcb527d8297 to your computer and use it in GitHub Desktop.

Select an option

Save Zhong-Zhang/4c2529ec9059f54ca3df3bcb527d8297 to your computer and use it in GitHub Desktop.
mini-gui-agent
import subprocess
import datetime
import urllib.parse
import logging
import os
from typing import List, Dict, Any, Optional
import io
import PIL.Image as Image
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s")
# ---------------------------------------------------------------------------
# Low‑level helpers
# ---------------------------------------------------------------------------
def _run(cmd: List[str], timeout: int = 30) -> bytes:
"""Run a shell command and return raw stdout (raises on non‑zero exit)."""
logger.debug("$ %s", " ".join(cmd))
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, timeout=timeout)
def _adb_prefix(serial: str | None) -> List[str]:
return ["adb", "-s", serial] if serial else ["adb"]
def _resize_pillow(origin_img, max_line_res: int = 1120):
"""Resize PIL image so that longest edge ≤ `max_line_res` using Lanczos."""
w, h = origin_img.size
if max_line_res is not None:
max_line = max_line_res
if h > max_line:
w = int(w * max_line / h)
h = max_line
if w > max_line:
h = int(h * max_line / w)
w = max_line
return origin_img.resize((w, h), resample=Image.Resampling.LANCZOS)
def _encode_text_for_adb(text: str) -> str:
"""Encode text for adb shell input. URL‑encode spaces as %s."""
def _esc(ch: str) -> str:
if ord(ch) < 128 and ch != " ":
return ch
if ch == " ":
return "%s"
return f"\\u{ord(ch):04x}"
return "".join(_esc(c) for c in text)
def _encode_ascii_for_adb(text: str) -> str:
"""Encode ASCII‑only string for `adb shell input text …` (spaces→%s)."""
return text.replace(" ", "%s")
# ---------------------------------------------------------------------------
# AndroidDevice class
# ---------------------------------------------------------------------------
class AndroidDevice:
"""Encapsulates a single, already‑connected Android handset."""
_yadb_pushed: bool = False
_yadb_local: str = os.path.join(os.path.dirname(__file__), "yadb/yadb")
def __init__(self, serial: str | None):
self.serial: str | None = serial
self.width: int = 0
self.height: int = 0
self.last_req_time: datetime.datetime = datetime.datetime.now()
# ---------- internal ----------
def _adb(self, *args: str, timeout: int = 30) -> bytes:
return _run(_adb_prefix(self.serial) + list(args), timeout)
def _ensure_yadb(self):
if AndroidDevice._yadb_pushed:
return
if not os.path.exists(AndroidDevice._yadb_local):
raise FileNotFoundError(f"yadb helper not found: {AndroidDevice._yadb_local}")
self._adb("push", AndroidDevice._yadb_local, "/data/local/tmp")
AndroidDevice._yadb_pushed = True
logger.info("yadb pushed to device for Unicode input support")
# ---------- public API ----------
def refresh_resolution(self) -> None:
"""Query and cache `wm size` (sets .width / .height)."""
raw = self._adb("shell", "wm", "size").decode()
try:
size_line = raw.split("Physical size: ")[1].splitlines()[0]
self.width, self.height = map(int, size_line.split("x"))
logger.info("Device %s resolution: %dx%d", self.serial or "<default>",
self.width, self.height)
except Exception as exc:
raise RuntimeError(f"Failed to parse wm size output: {raw}") from exc
# -------------------------------------------------------------------
# Step: execute user action
# -------------------------------------------------------------------
def step(self, data: Dict[str, Any]) -> None:
"""Execute a control step on the device (tap/swipe/key/text/clear)."""
logger.debug("Step: %s", data)
if "POINT" in data:
self._handle_point(data)
if "PRESS" in data:
self._handle_press(data["PRESS"])
if "TYPE" in data:
self._handle_type(data["TYPE"])
if "CLEAR" in data:
self._adb("shell", "input", "keyevent", "KEYCODE_CLEAR")
self.last_req_time = datetime.datetime.now()
if ("STATUS", "finish") in data.items() or ("STATUS", "impossible") in data.items():
logger.info("Task finished")
return True
return False
# -------------------------------------------------------------------
# State snapshot
# -------------------------------------------------------------------
def state(self) -> Dict[str, Any]:
return {
"width": self.width,
"height": self.height,
"last_req_time": self.last_req_time.isoformat(),
"screenshot": self.screenshot(),
}
# --- Device state ---------------------------------------------------
def screenshot(self, max_side: Optional[int] = None) -> Image.Image:
"""Grab screen; return Pillow Image. Optionally down‑scale with user rule."""
png_bytes = self._adb("exec-out", "screencap", "-p")
img = Image.open(io.BytesIO(png_bytes))
if max_side is not None:
img = _resize_pillow(img, max_side)
return img
# =================== private helpers ===================
def _handle_point(self, data: Dict[str, Any]) -> None:
x, y = data["POINT"]
x = int(x / 1000 * self.width)
y = int(y / 1000 * self.height)
if "to" in data: # swipe
if isinstance(data["to"], list):
x2, y2 = data["to"]
x2 = int(x2 / 1000 * self.width)
y2 = int(y2 / 1000 * self.height)
else: # directional swipe (up/down/left/right)
dirs = {
"up": (0, -0.15),
"down": (0, 0.15),
"left": (-0.15, 0),
"right": (0.15, 0),
}
if data["to"] not in dirs:
raise ValueError(f"Invalid swipe direction: {data['to']}")
dx_ratio, dy_ratio = dirs[data["to"]]
x2 = int(max(min(x + dx_ratio * self.width, self.width), 0))
y2 = int(max(min(y + dy_ratio * self.height, self.height), 0))
dur = str(data.get("duration", 150))
self._adb("shell", "input", "swipe", str(x), str(y), str(x2), str(y2), dur)
else: # simple tap
self._adb("shell", "input", "tap", str(x), str(y))
def _handle_press(self, key: str) -> None:
KEYS = {
"HOME": "KEYCODE_HOME",
"BACK": "KEYCODE_BACK",
"MENU": "KEYCODE_MENU",
"ENTER": "KEYCODE_ENTER",
"APPSELECT": "KEYCODE_APP_SWITCH",
"power": "KEYCODE_POWER",
"volume_up": "KEYCODE_VOLUME_UP",
"volume_down": "KEYCODE_VOLUME_DOWN",
"volume_mute": "KEYCODE_VOLUME_MUTE",
}
if key not in KEYS:
raise ValueError(f"Unknown PRESS value: {key}")
self._adb("shell", "input", "keyevent", KEYS[key])
# def _handle_type(self, raw):
# decoded = urllib.parse.unquote(raw)
# self._adb("shell", "am", "broadcast", '-a', 'ADB_INPUT_TEXT', '--es msg' , decoded)
# # self._adb("shell", "input", "text", decoded)
def _handle_type(self, raw):
text = urllib.parse.unquote(raw)
if all(ord(c) < 128 for c in text): # quick ASCII path
self._adb("shell", "input", "text", _encode_ascii_for_adb(text))
return
# Unicode → yadb
self._ensure_yadb()
safe = text.replace("'", "'\\''") # escape single quotes for sh
cmd = (
"app_process -Djava.class.path=/data/local/tmp/yadb /data/local/tmp "
"com.ysbing.yadb.Main -keyboard '%s'" % safe
)
self._adb("shell", cmd)
# ---------------------------------------------------------------------------
# Public utility function
# ---------------------------------------------------------------------------
def setup_device() -> AndroidDevice:
"""Detect the first connected & authorised Android phone and return an object."""
lines = _run(["adb", "devices"]).decode().strip().splitlines()[1:]
serials = [l.split()[0] for l in lines if l.strip() and "device" in l]
if not serials:
raise RuntimeError("No authorised Android device found. Plug in & check adb.")
if len(serials) > 1:
logger.warning("Multiple devices detected; defaulting to the first (%s).", serials[0])
dev = AndroidDevice(serials[0])
dev.refresh_resolution()
return dev
# ---------------------------------------------------------------------------
# Demo – run this file directly to test
# ---------------------------------------------------------------------------
if __name__ == "__main__":
device = setup_device()
logger.info("Device ready: serial=%s (%dx%d)", device.serial, device.width, device.height)
# Example: tap centre, take screenshot
device.step({"POINT": [500, 500]})
png = device.screenshot()
target = os.path.join(os.path.dirname(__file__), "screencap.png")
logger.info("Screenshot saved → %s (%d bytes)", target, len(png))
import abc
import base64
import io
import os
import time
from typing import Any, Optional
import google.generativeai as genai
from google.generativeai import types
from google.generativeai.types import answer_types
from google.generativeai.types import content_types
from google.generativeai.types import generation_types
from google.generativeai.types import safety_types
import numpy as np
from PIL import Image
import requests
import json
from jsonschema import Draft7Validator
ERROR_CALLING_LLM = "Error calling LLM"
END_POINT = "http://localhost:8000/v1/chat/completions"
# 获取当前文件的绝对路径
current_file_path = os.path.abspath(__file__)
current_dir = os.path.dirname(current_file_path)
def compact_json_dumps(obj):
return json.dumps(obj, indent=None, separators=(",", ":"), ensure_ascii=False)
ACTION_SCHEMA = json.load(
open(os.path.join(current_dir, "schema_thought.json"), encoding="utf-8")
)
items = list(ACTION_SCHEMA.items())
insert_index = 3 # 假设要插入到索引1的位置
items.insert(insert_index, ("required", ["thought"]))
# items.insert(insert_index, ("optional", ["thought"]))
ACTION_SCHEMA = dict(items)
SYSTEM_PROMPT = f"""# Role
你是一名熟悉安卓系统触屏GUI操作的智能体,将根据用户的问题,分析当前界面的GUI元素和布局,生成相应的操作。
# Task
针对用户问题,根据输入的当前屏幕截图,输出下一步的操作。
# Rule
- 以紧凑JSON格式输出
- 输出操作必须遵循Schema约束
# Schema
{json.dumps(ACTION_SCHEMA, indent=None, ensure_ascii=False, separators=(',', ':'))}"""
EXTRACT_SCHEMA = json.load(
open(os.path.join(current_dir, "schema_for_extraction.json"), encoding="utf-8")
)
validator = Draft7Validator(EXTRACT_SCHEMA)
def array_to_jpeg_bytes(image: np.ndarray) -> bytes:
"""Converts a numpy array into a byte string for a JPEG image."""
image = Image.fromarray(image)
return image_to_jpeg_bytes(image)
def image_to_jpeg_bytes(image: Image.Image) -> bytes:
in_mem_file = io.BytesIO()
image.save(in_mem_file, format="PNG")
# Reset file pointer to start
in_mem_file.seek(0)
img_bytes = in_mem_file.read()
return img_bytes
class LlmWrapper(abc.ABC):
"""Abstract interface for (text only) LLM."""
@abc.abstractmethod
def predict(
self,
text_prompt: str,
) -> tuple[str, Optional[bool], Any]:
"""Calling multimodal LLM with a prompt and a list of images.
Args:
text_prompt: Text prompt.
Returns:
Text output, is_safe, and raw output.
"""
class MultimodalLlmWrapper(abc.ABC):
"""Abstract interface for Multimodal LLM."""
@abc.abstractmethod
def predict_mm(
self, text_prompt: str, images: list[np.ndarray]
) -> tuple[str, Optional[bool], Any]:
"""Calling multimodal LLM with a prompt and a list of images.
Args:
text_prompt: Text prompt.
images: List of images as numpy ndarray.
Returns:
Text output and raw output.
"""
SAFETY_SETTINGS_BLOCK_NONE = {
types.HarmCategory.HARM_CATEGORY_HARASSMENT: (types.HarmBlockThreshold.BLOCK_NONE),
types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: (types.HarmBlockThreshold.BLOCK_NONE),
types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: (
types.HarmBlockThreshold.BLOCK_NONE
),
types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: (
types.HarmBlockThreshold.BLOCK_NONE
),
}
class MiniCPMWrapper(LlmWrapper, MultimodalLlmWrapper):
RETRY_WAITING_SECONDS = 20
def __init__(
self,
model_name: str,
max_retry: int = 3,
temperature: float = 0.1,
use_history: bool = False,
history_size: int = 10, # 最多保留最近 history_size 轮
):
if max_retry <= 0:
max_retry = 3
print("Max_retry must be positive. Reset it to 3")
self.max_retry = min(max_retry, 5)
self.temperature = temperature
self.model = model_name
# ---------- 新增 ----------
self.use_history = use_history
self.history_size = max(history_size, 1)
# history 以「单条消息」为粒度: [{'role': .., 'content': ..}, ...]
self.history: list[dict] = []
@classmethod
def encode_image(cls, image: np.ndarray) -> str:
return base64.b64encode(array_to_jpeg_bytes(image)).decode("utf-8")
def _push_history(self, role: str, content: Any):
"""把一条消息写入历史,并自动裁剪长度。"""
if not self.use_history:
return
self.history.append({"role": role, "content": content})
# 每轮对话包含 user + assistant 两条消息
max_msgs = self.history_size * 2
if len(self.history) > max_msgs:
self.history = self.history[-max_msgs:]
def clear_history(self):
"""外部可手动清空记忆。"""
self.history.clear()
def extract_and_validate_json(self, input_string):
try:
json_obj = json.loads(input_string)
validator.validate(json_obj, EXTRACT_SCHEMA)
return json_obj
except json.JSONDecodeError as e:
print("Error, JSON is NOT valid.")
return input_string
except Exception as e:
print(f"Error, JSON is NOT valid according to the schema.{input_string}", e)
return input_string
def predict(
self,
text_prompt: str,
) -> tuple[str, Optional[bool], Any]:
return self.predict_mm(text_prompt, [])
def predict_mm(
self, text_prompt: str, images: list[np.ndarray]
) -> tuple[str, Optional[bool], Any]:
assert len(images) == 1
# -------- 构造 messages --------
messages: list[dict] = [
{
"role": "system",
"content": [{"type": "text", "text": SYSTEM_PROMPT}],
}
]
# 1) 插入历史
if self.use_history and self.history:
messages.extend(self.history)
# 2) 当前 user 消息
user_content = [
{
"type": "text",
"text": f"<Question>{text_prompt}</Question>\n当前屏幕截图:(<image>./</image>)",
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{self.encode_image(images[0])}"
},
},
]
messages.append({"role": "user", "content": user_content})
payload = {
"model": self.model,
"temperature": self.temperature,
"messages": messages,
"max_tokens": 2048,
}
headers = {
"Content-Type": "application/json",
}
counter = self.max_retry
wait_seconds = self.RETRY_WAITING_SECONDS
while counter > 0:
try:
response = requests.post(
END_POINT,
headers=headers,
json=payload,
)
if response.ok and "choices" in response.json():
assistant_msg = response.json()["choices"][0]["message"]
assistant_text = assistant_msg["content"]
action = self.extract_and_validate_json(assistant_text)
# -------- 写回历史 --------
self._push_history("user", user_content)
self._push_history("assistant", assistant_msg["content"])
return assistant_text, None, response, action
print(
"Error calling OpenAI API with error message: "
+ response.json()["error"]["message"]
)
time.sleep(wait_seconds)
wait_seconds *= 2
except Exception as e: # pylint: disable=broad-exception-caught
# Want to catch all exceptions happened during LLM calls.
time.sleep(wait_seconds)
wait_seconds *= 2
counter -= 1
print("Error calling LLM, will retry soon...")
print(e)
return ERROR_CALLING_LLM, None, None
import time
from adb_utils import setup_device
import logging
import os
from agent_wrapper import MiniCPMWrapper
import numpy as np
from PIL import Image
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s")
def run_task(query):
device = setup_device()
minicpm = MiniCPMWrapper(model_name='AgentCPM-GUI', temperature=1, use_history=True, history_size=2)
is_finish = False
while not is_finish:
text_prompt = query
screenshot = device.screenshot(1120)
response = minicpm.predict_mm(text_prompt, [np.array(screenshot)])
action = response[3]
print(action)
is_finish = device.step(action)
time.sleep(2.5)
return is_finish
if __name__ == "__main__":
run_task("去哔哩哔哩看李子柒的最新视频,并且点赞。")
{
"type": "object",
"description": "执行操作并决定当前任务状态",
"additionalProperties": false,
"properties": {
"thought": {
"type": "string"
},
"POINT": {
"description": "点击屏幕上的指定位置",
"$ref": "#/$defs/Location"
},
"to": {
"description": "移动,组合手势参数",
"oneOf": [
{
"enum": [
"up",
"down",
"left",
"right"
],
"description": "结合 POINT 操作,实现向上下左右滑动"
},
{
"$ref": "#/$defs/Location",
"description": "移动到某个位置"
}
]
},
"duration": {
"type": "integer",
"description": "动作执行的时间或等待时间,毫秒",
"minimum": 0,
"default": 200
},
"PRESS": {
"type": "string",
"description": "触发特殊按键,HOME为回到主页按钮,BACK为返回按钮,ENTER为回车按钮,APPSELECT为查看已打开APP列表按钮",
"enum": [
"HOME",
"BACK",
"ENTER",
"APPSELECT"
]
},
"TYPE": {
"type": "string",
"description": "输入文本"
},
"DEEP_LINK": {
"type": "null",
"description": "跳转到最近打开的 APP"
},
"CLEAR": {
"type": "null",
"description": "清空输入框的内容"
},
"STATUS": {
"type": "string",
"description": "当前任务的状态。特殊情况:satisfied,无需操作;impossible,任务无法完成;interrupt,任务中断;need_feedback,需要用户反馈;",
"enum": [
"continue",
"start",
"finish",
"satisfied",
"impossible",
"interrupt",
"need_feedback"
],
"default": "continue"
}
},
"$defs": {
"Location": {
"type": "array",
"description": "坐标为相对于屏幕左上角位原点的相对位置,并且按照宽高比例缩放到 0~1000,数组第一个元素为横坐标 x,第二个元素为纵坐标 y",
"items": {
"type": "integer",
"minimum": 0,
"maximum": 1000
},
"minItems": 2,
"maxItems": 2
}
},
"allOf": [
{
"if": {
"required": ["to"]
},
"then": {
"required": ["POINT"]
}
},
{
"if": {
"anyOf": [
{ "not": { "required": ["STATUS"] } },
{ "properties": { "STATUS": { "enum": ["continue", "start"] } } }
]
},
"then": {
"anyOf": [
{ "required": ["POINT"] },
{ "required": ["PRESS"] },
{ "required": ["TYPE"] },
{ "required": ["DEEP_LINK"] },
{ "required": ["CLEAR"] },
{ "required": ["duration"] }
]
}
},
{
"oneOf": [
{
"required": ["POINT"],
"not": {
"anyOf": [
{ "required": ["PRESS"] },
{ "required": ["TYPE"] },
{ "required": ["DEEP_LINK"] },
{ "required": ["CLEAR"] }
]
}
},
{
"required": ["PRESS"],
"not": {
"anyOf": [
{ "required": ["POINT"] },
{ "required": ["TYPE"] },
{ "required": ["DEEP_LINK"] },
{ "required": ["CLEAR"] }
]
}
},
{
"required": ["TYPE"],
"not": {
"anyOf": [
{ "required": ["POINT"] },
{ "required": ["PRESS"] },
{ "required": ["DEEP_LINK"] },
{ "required": ["CLEAR"] }
]
}
},
{
"required": ["DEEP_LINK"],
"not": {
"anyOf": [
{ "required": ["POINT"] },
{ "required": ["PRESS"] },
{ "required": ["TYPE"] },
{ "required": ["CLEAR"] }
]
}
},
{
"required": ["CLEAR"],
"not": {
"anyOf": [
{ "required": ["POINT"] },
{ "required": ["PRESS"] },
{ "required": ["TYPE"] },
{ "required": ["DEEP_LINK"] }
]
}
},
{
"not": {
"anyOf": [
{ "required": ["POINT"] },
{ "required": ["PRESS"] },
{ "required": ["TYPE"] },
{ "required": ["DEEP_LINK"] },
{ "required": ["CLEAR"] }
]
}
}
]
}
]
}
{
"type": "object",
"description": "执行操作并决定当前任务状态",
"additionalProperties": false,
"properties": {
"thought": {
"type": "string",
"description": "智能体的思维过程"
},
"POINT": {
"$ref": "#/$defs/Location",
"description": "点击屏幕上的指定位置"
},
"to": {
"description": "移动,组合手势参数",
"oneOf": [
{
"enum": [
"up",
"down",
"left",
"right"
],
"description": "从当前点(POINT)出发,执行滑动手势操作,方向包括向上、向下、向左、向右"
},
{
"$ref": "#/$defs/Location",
"description": "移动到某个位置"
}
]
},
"duration": {
"type": "integer",
"description": "动作执行的时间或等待时间,毫秒",
"minimum": 0,
"default": 200
},
"PRESS": {
"type": "string",
"description": "触发特殊按键,HOME为回到主页按钮,BACK为返回按钮,ENTER为回撤按钮",
"enum": [
"HOME",
"BACK",
"ENTER"
]
},
"TYPE": {
"type": "string",
"description": "输入文本"
},
"STATUS": {
"type": "string",
"description": "当前任务的状态。特殊情况:satisfied,无需操作;impossible,任务无法完成;interrupt,任务中断;need_feedback,需要用户反馈;",
"enum": [
"continue",
"finish",
"satisfied",
"impossible",
"interrupt",
"need_feedback"
],
"default": "continue"
}
},
"$defs": {
"Location": {
"type": "array",
"description": "坐标为相对于屏幕左上角位原点的相对位置,并且按照宽高比例缩放到0~1000,数组第一个元素为横坐标x,第二个元素为纵坐标y",
"items": {
"type": "integer",
"minimum": 0,
"maximum": 1000
},
"minItems": 2,
"maxItems": 2
}
}
}
@Yifei-Bi
Copy link

已解决,模型要改成:AgentCPM-GUI ,self.openai_api_key = OPENAI_KEY 要注释掉

好像要改的地方比较多?

@17603127956
Copy link

已解决,模型要改成:AgentCPM-GUI ,self.openai_api_key = OPENAI_KEY 要注释掉

请问要怎么更改

修改代码里的model_name='minicpm',改为AgentCPM-GUI

@sh308178685
Copy link

schema_for_extraction.json 这个文件在哪啊

@Zhong-Zhang
Copy link
Author

代码已修改

@manmushanhe
Copy link

测试high-level的指令时,每步用户的输入都要输入一遍指令吗?这样测试时,发现多步的输出都是一样的。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment