Skip to content

Instantly share code, notes, and snippets.

@ms1995
Created August 1, 2019 04:31
Show Gist options
  • Save ms1995/47bd5e8b78e7f221103e0b029f3a11e1 to your computer and use it in GitHub Desktop.
Save ms1995/47bd5e8b78e7f221103e0b029f3a11e1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Python wrapper for Android uiautomator tool."""
import sys
import os
import subprocess
import time
import itertools
import json
import hashlib
import socket
import re
import collections
import xml.dom.minidom
DEVICE_PORT = int(os.environ.get('UIAUTOMATOR_DEVICE_PORT', '9008'))
LOCAL_PORT = int(os.environ.get('UIAUTOMATOR_LOCAL_PORT', '9008'))
if 'localhost' not in os.environ.get('no_proxy', ''):
os.environ['no_proxy'] = "localhost,%s" % os.environ.get('no_proxy', '')
try:
import urllib2
except ImportError:
import urllib.request as urllib2
try:
from httplib import HTTPException
except:
from http.client import HTTPException
try:
if os.name == 'nt':
import urllib3
except: # to fix python setup error on Windows.
pass
__author__ = "Xiaocong He"
__all__ = ["device", "Device", "rect", "point", "Selector", "JsonRPCError"]
def U(x):
if sys.version_info.major == 2:
return x.decode('utf-8') if type(x) is str else x
elif sys.version_info.major == 3:
return x
def param_to_property(*props, **kwprops):
if props and kwprops:
raise SyntaxError("Can not set both props and kwprops at the same time.")
class Wrapper(object):
def __init__(self, func):
self.func = func
self.kwargs, self.args = {}, []
def __getattr__(self, attr):
if kwprops:
for prop_name, prop_values in kwprops.items():
if attr in prop_values and prop_name not in self.kwargs:
self.kwargs[prop_name] = attr
return self
elif attr in props:
self.args.append(attr)
return self
raise AttributeError("%s parameter is duplicated or not allowed!" % attr)
def __call__(self, *args, **kwargs):
if kwprops:
kwargs.update(self.kwargs)
self.kwargs = {}
return self.func(*args, **kwargs)
else:
new_args, self.args = self.args + list(args), []
return self.func(*new_args, **kwargs)
return Wrapper
class JsonRPCError(Exception):
def __init__(self, code, message):
self.code = int(code)
self.message = message
def __str__(self):
return "JsonRPC Error code: %d, Message: %s" % (self.code, self.message)
class JsonRPCMethod(object):
if os.name == 'nt':
try:
pool = urllib3.PoolManager()
except:
pass
def __init__(self, url, method, timeout=30):
self.url, self.method, self.timeout = url, method, timeout
def __call__(self, *args, **kwargs):
if args and kwargs:
raise SyntaxError("Could not accept both *args and **kwargs as JSONRPC parameters.")
data = {"jsonrpc": "2.0", "method": self.method, "id": self.id()}
if args:
data["params"] = args
elif kwargs:
data["params"] = kwargs
jsonresult = {"result": ""}
if os.name == "nt":
res = self.pool.urlopen("POST",
self.url,
headers={"Content-Type": "application/json"},
body=json.dumps(data).encode("utf-8"),
timeout=self.timeout)
jsonresult = json.loads(res.data.decode("utf-8"))
else:
result = None
try:
req = urllib2.Request(self.url,
json.dumps(data).encode("utf-8"),
{"Content-type": "application/json"})
result = urllib2.urlopen(req, timeout=self.timeout)
jsonresult = json.loads(result.read().decode("utf-8"))
finally:
if result is not None:
result.close()
if "error" in jsonresult and jsonresult["error"]:
raise JsonRPCError(
jsonresult["error"]["code"],
"%s: %s" % (jsonresult["error"]["data"]["exceptionTypeName"], jsonresult["error"]["message"])
)
return jsonresult["result"]
def id(self):
m = hashlib.md5()
m.update(("%s at %f" % (self.method, time.time())).encode("utf-8"))
return m.hexdigest()
class JsonRPCClient(object):
def __init__(self, url, timeout=30, method_class=JsonRPCMethod):
self.url = url
self.timeout = timeout
self.method_class = method_class
def __getattr__(self, method):
return self.method_class(self.url, method, timeout=self.timeout)
class Selector(dict):
"""The class is to build parameters for UiSelector passed to Android device.
"""
__fields = {
"text": (0x01, None), # MASK_TEXT,
"textContains": (0x02, None), # MASK_TEXTCONTAINS,
"textMatches": (0x04, None), # MASK_TEXTMATCHES,
"textStartsWith": (0x08, None), # MASK_TEXTSTARTSWITH,
"className": (0x10, None), # MASK_CLASSNAME
"classNameMatches": (0x20, None), # MASK_CLASSNAMEMATCHES
"description": (0x40, None), # MASK_DESCRIPTION
"descriptionContains": (0x80, None), # MASK_DESCRIPTIONCONTAINS
"descriptionMatches": (0x0100, None), # MASK_DESCRIPTIONMATCHES
"descriptionStartsWith": (0x0200, None), # MASK_DESCRIPTIONSTARTSWITH
"checkable": (0x0400, False), # MASK_CHECKABLE
"checked": (0x0800, False), # MASK_CHECKED
"clickable": (0x1000, False), # MASK_CLICKABLE
"longClickable": (0x2000, False), # MASK_LONGCLICKABLE,
"scrollable": (0x4000, False), # MASK_SCROLLABLE,
"enabled": (0x8000, False), # MASK_ENABLED,
"focusable": (0x010000, False), # MASK_FOCUSABLE,
"focused": (0x020000, False), # MASK_FOCUSED,
"selected": (0x040000, False), # MASK_SELECTED,
"packageName": (0x080000, None), # MASK_PACKAGENAME,
"packageNameMatches": (0x100000, None), # MASK_PACKAGENAMEMATCHES,
"resourceId": (0x200000, None), # MASK_RESOURCEID,
"resourceIdMatches": (0x400000, None), # MASK_RESOURCEIDMATCHES,
"index": (0x800000, 0), # MASK_INDEX,
"instance": (0x01000000, 0) # MASK_INSTANCE,
}
__mask, __childOrSibling, __childOrSiblingSelector = "mask", "childOrSibling", "childOrSiblingSelector"
def __init__(self, **kwargs):
super(Selector, self).__setitem__(self.__mask, 0)
super(Selector, self).__setitem__(self.__childOrSibling, [])
super(Selector, self).__setitem__(self.__childOrSiblingSelector, [])
for k in kwargs:
self[k] = kwargs[k]
def __setitem__(self, k, v):
if k in self.__fields:
super(Selector, self).__setitem__(U(k), U(v))
super(Selector, self).__setitem__(self.__mask, self[self.__mask] | self.__fields[k][0])
else:
raise ReferenceError("%s is not allowed." % k)
def __delitem__(self, k):
if k in self.__fields:
super(Selector, self).__delitem__(k)
super(Selector, self).__setitem__(self.__mask, self[self.__mask] & ~self.__fields[k][0])
def clone(self):
kwargs = dict((k, self[k]) for k in self
if k not in [self.__mask, self.__childOrSibling, self.__childOrSiblingSelector])
selector = Selector(**kwargs)
for v in self[self.__childOrSibling]:
selector[self.__childOrSibling].append(v)
for s in self[self.__childOrSiblingSelector]:
selector[self.__childOrSiblingSelector].append(s.clone())
return selector
def child(self, **kwargs):
self[self.__childOrSibling].append("child")
self[self.__childOrSiblingSelector].append(Selector(**kwargs))
return self
def sibling(self, **kwargs):
self[self.__childOrSibling].append("sibling")
self[self.__childOrSiblingSelector].append(Selector(**kwargs))
return self
child_selector, from_parent = child, sibling
def rect(top=0, left=0, bottom=100, right=100):
return {"top": top, "left": left, "bottom": bottom, "right": right}
def intersect(rect1, rect2):
top = rect1["top"] if rect1["top"] > rect2["top"] else rect2["top"]
bottom = rect1["bottom"] if rect1["bottom"] < rect2["bottom"] else rect2["bottom"]
left = rect1["left"] if rect1["left"] > rect2["left"] else rect2["left"]
right = rect1["right"] if rect1["right"] < rect2["right"] else rect2["right"]
return left, top, right, bottom
def point(x=0, y=0):
return {"x": x, "y": y}
class Adb(object):
def __init__(self, serial=None, adb_server_host=None, adb_server_port=None):
self.__adb_cmd = None
self.default_serial = serial if serial else os.environ.get("ANDROID_SERIAL", None)
self.adb_server_host = str(adb_server_host if adb_server_host else 'localhost')
self.adb_server_port = str(adb_server_port if adb_server_port else '5037')
self.adbHostPortOptions = []
if self.adb_server_host not in ['localhost', '127.0.0.1']:
self.adbHostPortOptions += ["-H", self.adb_server_host]
if self.adb_server_port != '5037':
self.adbHostPortOptions += ["-P", self.adb_server_port]
def adb(self):
if self.__adb_cmd is None:
if "ANDROID_HOME" in os.environ:
filename = "adb.exe" if os.name == 'nt' else "adb"
adb_cmd = os.path.join(os.environ["ANDROID_HOME"], "platform-tools", filename)
if not os.path.exists(adb_cmd):
raise EnvironmentError(
"Adb not found in $ANDROID_HOME path: %s." % os.environ["ANDROID_HOME"])
else:
import distutils
if "spawn" not in dir(distutils):
import distutils.spawn
adb_cmd = distutils.spawn.find_executable("adb")
if adb_cmd:
adb_cmd = os.path.realpath(adb_cmd)
else:
raise EnvironmentError("$ANDROID_HOME environment not set.")
self.__adb_cmd = adb_cmd
return self.__adb_cmd
def cmd(self, *args, **kwargs):
'''adb command, add -s serial by default. return the subprocess.Popen object.'''
serial = self.device_serial()
if serial:
if " " in serial: # TODO how to include special chars on command line
serial = "'%s'" % serial
return self.raw_cmd(*["-s", serial] + list(args))
else:
return self.raw_cmd(*args)
def raw_cmd(self, *args):
'''adb command. return the subprocess.Popen object.'''
cmd_line = [self.adb()] + self.adbHostPortOptions + list(args)
if os.name != "nt":
cmd_line = [" ".join(cmd_line)]
return subprocess.Popen(cmd_line, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def device_serial(self):
if not self.default_serial:
devices = self.devices()
if devices:
if len(devices) is 1:
self.default_serial = list(devices.keys())[0]
else:
raise EnvironmentError("Multiple devices attached but default android serial not set.")
else:
raise EnvironmentError("Device not attached.")
return self.default_serial
def devices(self):
'''get a dict of attached devices. key is the device serial, value is device name.'''
out = self.raw_cmd("devices").communicate()[0].decode("utf-8")
match = "List of devices attached"
index = out.find(match)
if index < 0:
raise EnvironmentError("adb is not working.")
return dict([s.split("\t") for s in out[index + len(match):].strip().splitlines() if s.strip()])
def forward(self, local_port, device_port):
'''adb port forward. return 0 if success, else non-zero.'''
return self.cmd("forward", "tcp:%d" % local_port, "tcp:%d" % device_port).wait()
def forward_list(self):
'''adb forward --list'''
version = self.version()
if int(version[1]) <= 1 and int(version[2]) <= 0 and int(version[3]) < 31:
raise EnvironmentError("Low adb version.")
lines = self.raw_cmd("forward", "--list").communicate()[0].decode("utf-8").strip().splitlines()
return [line.strip().split() for line in lines]
def version(self):
'''adb version'''
match = re.search(r"(\d+)\.(\d+)\.(\d+)", self.raw_cmd("version").communicate()[0].decode("utf-8"))
return [match.group(i) for i in range(4)]
_init_local_port = LOCAL_PORT - 1
def next_local_port(adbHost=None):
def is_port_listening(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = s.connect_ex((str(adbHost) if adbHost else '127.0.0.1', port))
s.close()
return result == 0
global _init_local_port
_init_local_port = _init_local_port + 1 if _init_local_port < 32764 else LOCAL_PORT
# Quick hack for remote ADB port forwarding problem
# while is_port_listening(_init_local_port):
# _init_local_port += 1
return _init_local_port
class NotFoundHandler(object):
'''
Handler for UI Object Not Found exception.
It's a replacement of UiAutomator watcher on device side.
'''
def __init__(self):
self.__handlers = collections.defaultdict(lambda: {'on': True, 'handlers': []})
def __get__(self, instance, type):
return self.__handlers[instance.adb.device_serial()]
class AutomatorServer(object):
"""start and quit rpc server on device.
"""
__jar_files = {
"bundle.jar": "libs/bundle.jar",
"uiautomator-stub.jar": "libs/uiautomator-stub.jar"
}
__apk_files = ["libs/app-uiautomator.apk", "libs/app-uiautomator-test.apk"]
__sdk = 0
handlers = NotFoundHandler() # handler UI Not Found exception
def __init__(self, serial=None, local_port=None, device_port=None, adb_server_host=None, adb_server_port=None):
self.uiautomator_process = None
self.adb = Adb(serial=serial, adb_server_host=adb_server_host, adb_server_port=adb_server_port)
self.device_port = int(device_port) if device_port else DEVICE_PORT
self.local_port = LOCAL_PORT
# if local_port:
# self.local_port = local_port
# else:
# try: # first we will try to use the local port already adb forwarded
# for s, lp, rp in self.adb.forward_list():
# if s == self.adb.device_serial() and rp == 'tcp:%d' % self.device_port:
# self.local_port = int(lp[4:])
# break
# else:
# self.local_port = next_local_port(adb_server_host)
# except:
# self.local_port = next_local_port(adb_server_host)
self.stop()
self.start()
def push(self):
base_dir = os.path.dirname(__file__)
for jar, url in self.__jar_files.items():
filename = os.path.join(base_dir, url)
self.adb.cmd("push", filename, "/data/local/tmp/").wait()
return list(self.__jar_files.keys())
def install(self):
base_dir = os.path.dirname(__file__)
for apk in self.__apk_files:
self.adb.cmd("install", "-r", "-t", os.path.join(base_dir, apk)).wait()
@property
def jsonrpc(self):
return self.jsonrpc_wrap(timeout=int(os.environ.get("jsonrpc_timeout", 90)))
def jsonrpc_wrap(self, timeout):
server = self
ERROR_CODE_BASE = -32000
def _JsonRPCMethod(url, method, timeout, restart=True):
_method_obj = JsonRPCMethod(url, method, timeout)
def wrapper(*args, **kwargs):
URLError = urllib3.exceptions.HTTPError if os.name == "nt" else urllib2.URLError
try:
return _method_obj(*args, **kwargs)
except (URLError, socket.error, HTTPException) as e:
if restart:
server.stop()
server.start(timeout=30)
return _JsonRPCMethod(url, method, timeout, False)(*args, **kwargs)
else:
raise
except JsonRPCError as e:
if e.code >= ERROR_CODE_BASE - 1:
server.stop()
server.start()
return _method_obj(*args, **kwargs)
elif e.code == ERROR_CODE_BASE - 2 and self.handlers['on']: # Not Found
try:
self.handlers['on'] = False
# any handler returns True will break the left handlers
any(handler(self.handlers.get('device', None)) for handler in self.handlers['handlers'])
finally:
self.handlers['on'] = True
return _method_obj(*args, **kwargs)
raise
return wrapper
return JsonRPCClient(self.rpc_uri,
timeout=timeout,
method_class=_JsonRPCMethod)
def __jsonrpc(self):
return JsonRPCClient(self.rpc_uri, timeout=int(os.environ.get("JSONRPC_TIMEOUT", 90)))
def sdk_version(self):
'''sdk version of connected device.'''
if self.__sdk == 0:
try:
self.__sdk = int(self.adb.cmd("shell", "getprop", "ro.build.version.sdk").communicate()[0].decode("utf-8").strip())
except:
pass
return self.__sdk
def start(self, timeout=5):
if self.sdk_version() < 18:
files = self.push()
cmd = list(itertools.chain(
["shell", "uiautomator", "runtest"],
files,
["-c", "com.github.uiautomatorstub.Stub"]
))
else:
self.install()
cmd = ["shell", "am", "instrument", "-w",
"com.github.uiautomator.test/android.support.test.runner.AndroidJUnitRunner"]
self.uiautomator_process = self.adb.cmd(*cmd)
self.adb.forward(self.local_port, self.device_port)
while not self.alive and timeout > 0:
time.sleep(0.1)
timeout -= 0.1
if not self.alive:
# raise IOError("RPC server not started!")
print "RPC server not started! Try again."
self.stop()
self.start()
def ping(self):
try:
return self.__jsonrpc().ping()
except:
return None
@property
def alive(self):
'''Check if the rpc server is alive.'''
return self.ping() == "pong"
def stop(self):
'''Stop the rpc server.'''
package_names = ["com.github.uiautomator","com.github.uiautomator.test"]
for pkg in package_names:
self.adb.cmd("uninstall", pkg).wait()
return
if self.uiautomator_process and self.uiautomator_process.poll() is None:
res = None
try:
res = urllib2.urlopen(self.stop_uri)
self.uiautomator_process.wait()
except:
self.uiautomator_process.kill()
finally:
if res is not None:
res.close()
self.uiautomator_process = None
try:
out = self.adb.cmd("shell", "ps", "-C", "uiautomator").communicate()[0].decode("utf-8").strip().splitlines()
if out:
index = out[0].split().index("PID")
for line in out[1:]:
if len(line.split()) > index:
self.adb.cmd("shell", "kill", "-9", line.split()[index]).wait()
except:
pass
@property
def stop_uri(self):
return "http://%s:%d/stop" % (self.adb.adb_server_host, self.local_port)
@property
def rpc_uri(self):
return "http://%s:%d/jsonrpc/0" % (self.adb.adb_server_host, self.local_port)
@property
def screenshot_uri(self):
return "http://%s:%d/screenshot/0" % (self.adb.adb_server_host, self.local_port)
def screenshot(self, filename=None, scale=1.0, quality=100):
if self.sdk_version() >= 18:
try:
req = urllib2.Request("%s?scale=%f&quality=%f" % (self.screenshot_uri, scale, quality))
result = urllib2.urlopen(req, timeout=30)
if filename:
with open(filename, 'wb') as f:
f.write(result.read())
return filename
else:
return result.read()
except:
pass
return None
class AutomatorDevice(object):
'''uiautomator wrapper of android device'''
__orientation = ( # device orientation
(0, "natural", "n", 0),
(1, "left", "l", 90),
(2, "upsidedown", "u", 180),
(3, "right", "r", 270)
)
__alias = {
"width": "displayWidth",
"height": "displayHeight"
}
def __init__(self, serial=None, local_port=None, adb_server_host=None, adb_server_port=None):
self.server = AutomatorServer(
serial=serial,
local_port=local_port,
adb_server_host=adb_server_host,
adb_server_port=adb_server_port
)
def __call__(self, **kwargs):
return AutomatorDeviceObject(self, Selector(**kwargs))
def __getattr__(self, attr):
'''alias of fields in info property.'''
info = self.info
if attr in info:
return info[attr]
elif attr in self.__alias:
return info[self.__alias[attr]]
else:
raise AttributeError("%s attribute not found!" % attr)
@property
def info(self):
'''Get the device info.'''
return self.server.jsonrpc.deviceInfo()
def click(self, x, y):
'''click at arbitrary coordinates.'''
return self.server.jsonrpc.click(x, y)
def long_click(self, x, y):
'''long click at arbitrary coordinates.'''
return self.swipe(x, y, x + 1, y + 1)
def swipe(self, sx, sy, ex, ey, steps=100):
return self.server.jsonrpc.swipe(sx, sy, ex, ey, steps)
def swipePoints(self, points, steps=100):
ppoints = []
for p in points:
ppoints.append(p[0])
ppoints.append(p[1])
return self.server.jsonrpc.swipePoints(ppoints, steps)
def drag(self, sx, sy, ex, ey, steps=100):
'''Swipe from one point to another point.'''
return self.server.jsonrpc.drag(sx, sy, ex, ey, steps)
def dump(self, filename=None, compressed=True, pretty=True):
'''dump device window and pull to local file.'''
content = self.server.jsonrpc.dumpWindowHierarchy(compressed, None)
if filename:
with open(filename, "wb") as f:
f.write(content.encode("utf-8"))
if pretty and "\n " not in content:
xml_text = xml.dom.minidom.parseString(content.encode("utf-8"))
content = U(xml_text.toprettyxml(indent=' '))
return content
def screenshot(self, filename, scale=1.0, quality=100):
'''take screenshot.'''
result = self.server.screenshot(filename, scale, quality)
if result:
return result
device_file = self.server.jsonrpc.takeScreenshot("screenshot.png",
scale, quality)
if not device_file:
return None
p = self.server.adb.cmd("pull", device_file, filename)
p.wait()
self.server.adb.cmd("shell", "rm", device_file).wait()
return filename if p.returncode is 0 else None
def freeze_rotation(self, freeze=True):
'''freeze or unfreeze the device rotation in current status.'''
self.server.jsonrpc.freezeRotation(freeze)
@property
def orientation(self):
'''
orienting the devie to left/right or natural.
left/l: rotation=90 , displayRotation=1
right/r: rotation=270, displayRotation=3
natural/n: rotation=0 , displayRotation=0
upsidedown/u: rotation=180, displayRotation=2
'''
return self.__orientation[self.info["displayRotation"]][1]
@orientation.setter
def orientation(self, value):
'''setter of orientation property.'''
for values in self.__orientation:
if value in values:
# can not set upside-down until api level 18.
self.server.jsonrpc.setOrientation(values[1])
break
else:
raise ValueError("Invalid orientation.")
@property
def last_traversed_text(self):
'''get last traversed text. used in webview for highlighted text.'''
return self.server.jsonrpc.getLastTraversedText()
def clear_traversed_text(self):
'''clear the last traversed text.'''
self.server.jsonrpc.clearLastTraversedText()
@property
def open(self):
'''
Open notification or quick settings.
Usage:
d.open.notification()
d.open.quick_settings()
'''
@param_to_property(action=["notification", "quick_settings"])
def _open(action):
if action == "notification":
return self.server.jsonrpc.openNotification()
else:
return self.server.jsonrpc.openQuickSettings()
return _open
@property
def handlers(self):
obj = self
class Handlers(object):
def on(self, fn):
if fn not in obj.server.handlers['handlers']:
obj.server.handlers['handlers'].append(fn)
obj.server.handlers['device'] = obj
return fn
def off(self, fn):
if fn in obj.server.handlers['handlers']:
obj.server.handlers['handlers'].remove(fn)
return Handlers()
@property
def watchers(self):
obj = self
class Watchers(list):
def __init__(self):
for watcher in obj.server.jsonrpc.getWatchers():
self.append(watcher)
@property
def triggered(self):
return obj.server.jsonrpc.hasAnyWatcherTriggered()
def remove(self, name=None):
if name:
obj.server.jsonrpc.removeWatcher(name)
else:
for name in self:
obj.server.jsonrpc.removeWatcher(name)
def reset(self):
obj.server.jsonrpc.resetWatcherTriggers()
return self
def run(self):
obj.server.jsonrpc.runWatchers()
return self
return Watchers()
def watcher(self, name):
obj = self
class Watcher(object):
def __init__(self):
self.__selectors = []
@property
def triggered(self):
return obj.server.jsonrpc.hasWatcherTriggered(name)
def remove(self):
obj.server.jsonrpc.removeWatcher(name)
def when(self, **kwargs):
self.__selectors.append(Selector(**kwargs))
return self
def click(self, **kwargs):
obj.server.jsonrpc.registerClickUiObjectWatcher(name, self.__selectors, Selector(**kwargs))
@property
def press(self):
@param_to_property(
"home", "back", "left", "right", "up", "down", "center",
"search", "enter", "delete", "del", "recent", "volume_up",
"menu", "volume_down", "volume_mute", "camera", "power")
def _press(*args):
obj.server.jsonrpc.registerPressKeyskWatcher(name, self.__selectors, args)
return _press
return Watcher()
@property
def press(self):
'''
press key via name or key code. Supported key name includes:
home, back, left, right, up, down, center, menu, search, enter,
delete(or del), recent(recent apps), volume_up, volume_down,
volume_mute, camera, power.
Usage:
d.press.back() # press back key
d.press.menu() # press home key
d.press(89) # press keycode
'''
@param_to_property(
key=["home", "back", "left", "right", "up", "down", "center",
"menu", "search", "enter", "delete", "del", "recent",
"volume_up", "volume_down", "volume_mute", "camera", "power"]
)
def _press(key, meta=None):
if isinstance(key, int):
return self.server.jsonrpc.pressKeyCode(key, meta) if meta else self.server.jsonrpc.pressKeyCode(key)
else:
return self.server.jsonrpc.pressKey(str(key))
return _press
def wakeup(self):
'''turn on screen in case of screen off.'''
self.server.jsonrpc.wakeUp()
def sleep(self):
'''turn off screen in case of screen on.'''
self.server.jsonrpc.sleep()
@property
def screen(self):
'''
Turn on/off screen.
Usage:
d.screen.on()
d.screen.off()
d.screen == 'on' # Check if the screen is on, same as 'd.screenOn'
d.screen == 'off' # Check if the screen is off, same as 'not d.screenOn'
'''
devive_self = self
class _Screen(object):
def on(self):
return devive_self.wakeup()
def off(self):
return devive_self.sleep()
def __call__(self, action):
if action == "on":
return self.on()
elif action == "off":
return self.off()
else:
raise AttributeError("Invalid parameter: %s" % action)
def __eq__(self, value):
info = devive_self.info
if "screenOn" not in info:
raise EnvironmentError("Not supported on Android 4.3 and belows.")
if value in ["on", "On", "ON"]:
return info["screenOn"]
elif value in ["off", "Off", "OFF"]:
return not info["screenOn"]
raise ValueError("Invalid parameter. It can only be compared with on/off.")
def __ne__(self, value):
return not self.__eq__(value)
return _Screen()
@property
def wait(self):
'''
Waits for the current application to idle or window update event occurs.
Usage:
d.wait.idle(timeout=1000)
d.wait.update(timeout=1000, package_name="com.android.settings")
'''
@param_to_property(action=["idle", "update"])
def _wait(action, timeout=1000, package_name=None):
if timeout / 1000 + 5 > int(os.environ.get("JSONRPC_TIMEOUT", 90)):
http_timeout = timeout / 1000 + 5
else:
http_timeout = int(os.environ.get("JSONRPC_TIMEOUT", 90))
if action == "idle":
return self.server.jsonrpc_wrap(timeout=http_timeout).waitForIdle(timeout)
elif action == "update":
return self.server.jsonrpc_wrap(timeout=http_timeout).waitForWindowUpdate(package_name, timeout)
return _wait
def exists(self, **kwargs):
'''Check if the specified ui object by kwargs exists.'''
return self(**kwargs).exists
Device = AutomatorDevice
class AutomatorDeviceUiObject(object):
'''Represent a UiObject, on which user can perform actions, such as click, set text
'''
__alias = {'description': "contentDescription"}
def __init__(self, device, selector):
self.device = device
self.jsonrpc = device.server.jsonrpc
self.selector = selector
@property
def exists(self):
'''check if the object exists in current window.'''
return self.jsonrpc.exist(self.selector)
def __getattr__(self, attr):
'''alias of fields in info property.'''
info = self.info
if attr in info:
return info[attr]
elif attr in self.__alias:
return info[self.__alias[attr]]
else:
raise AttributeError("%s attribute not found!" % attr)
@property
def info(self):
'''ui object info.'''
return self.jsonrpc.objInfo(self.selector)
def set_text(self, text):
'''set the text field.'''
if text in [None, ""]:
return self.jsonrpc.clearTextField(self.selector) # TODO no return
else:
return self.jsonrpc.setText(self.selector, text)
def clear_text(self):
'''clear text. alias for set_text(None).'''
self.set_text(None)
@property
def click(self):
'''
click on the ui object.
Usage:
d(text="Clock").click() # click on the center of the ui object
d(text="OK").click.wait(timeout=3000) # click and wait for the new window update
d(text="John").click.topleft() # click on the topleft of the ui object
d(text="John").click.bottomright() # click on the bottomright of the ui object
'''
@param_to_property(action=["tl", "topleft", "br", "bottomright", "wait"])
def _click(action=None, timeout=3000):
if action is None:
return self.jsonrpc.click(self.selector)
elif action in ["tl", "topleft", "br", "bottomright"]:
return self.jsonrpc.click(self.selector, action)
else:
return self.jsonrpc.clickAndWaitForNewWindow(self.selector, timeout)
return _click
@property
def long_click(self):
'''
Perform a long click action on the object.
Usage:
d(text="Image").long_click() # long click on the center of the ui object
d(text="Image").long_click.topleft() # long click on the topleft of the ui object
d(text="Image").long_click.bottomright() # long click on the topleft of the ui object
'''
@param_to_property(corner=["tl", "topleft", "br", "bottomright"])
def _long_click(corner=None):
info = self.info
if info["longClickable"]:
if corner:
return self.jsonrpc.longClick(self.selector, corner)
else:
return self.jsonrpc.longClick(self.selector)
else:
bounds = info.get("visibleBounds") or info.get("bounds")
if corner in ["tl", "topleft"]:
x = (5 * bounds["left"] + bounds["right"]) / 6
y = (5 * bounds["top"] + bounds["bottom"]) / 6
elif corner in ["br", "bottomright"]:
x = (bounds["left"] + 5 * bounds["right"]) / 6
y = (bounds["top"] + 5 * bounds["bottom"]) / 6
else:
x = (bounds["left"] + bounds["right"]) / 2
y = (bounds["top"] + bounds["bottom"]) / 2
return self.device.long_click(x, y)
return _long_click
@property
def drag(self):
'''
Drag the ui object to other point or ui object.
Usage:
d(text="Clock").drag.to(x=100, y=100) # drag to point (x,y)
d(text="Clock").drag.to(text="Remove") # drag to another object
'''
def to(obj, *args, **kwargs):
if len(args) >= 2 or "x" in kwargs or "y" in kwargs:
drag_to = lambda x, y, steps=100: self.jsonrpc.dragTo(self.selector, x, y, steps)
else:
drag_to = lambda steps=100, **kwargs: self.jsonrpc.dragTo(self.selector, Selector(**kwargs), steps)
return drag_to(*args, **kwargs)
return type("Drag", (object,), {"to": to})()
def gesture(self, start1, start2, *args, **kwargs):
'''
perform two point gesture.
Usage:
d().gesture(startPoint1, startPoint2).to(endPoint1, endPoint2, steps)
d().gesture(startPoint1, startPoint2, endPoint1, endPoint2, steps)
'''
def to(obj_self, end1, end2, steps=100):
ctp = lambda pt: point(*pt) if type(pt) == tuple else pt # convert tuple to point
s1, s2, e1, e2 = ctp(start1), ctp(start2), ctp(end1), ctp(end2)
return self.jsonrpc.gesture(self.selector, s1, s2, e1, e2, steps)
obj = type("Gesture", (object,), {"to": to})()
return obj if len(args) == 0 else to(None, *args, **kwargs)
def gestureM(self, start1, start2, start3, *args, **kwargs):
'''
perform 3 point gesture.
Usage:
d().gestureM((100,200),(100,300),(100,400),(100,400),(100,400),(100,400))
d().gestureM((100,200),(100,300),(100,400)).to((100,400),(100,400),(100,400))
'''
def to(obj_self, end1, end2, end3, steps=100):
ctp = lambda pt: point(*pt) if type(pt) == tuple else pt # convert tuple to point
s1, s2, s3, e1, e2, e3 = ctp(start1), ctp(start2), ctp(start3), ctp(end1), ctp(end2), ctp(end3)
return self.jsonrpc.gesture(self.selector, s1, s2, s3, e1, e2, e3, steps)
obj = type("Gesture", (object,), {"to": to})()
return obj if len(args) == 0 else to(None, *args, **kwargs)
@property
def pinch(self):
'''
Perform two point gesture from edge to center(in) or center to edge(out).
Usages:
d().pinch.In(percent=100, steps=10)
d().pinch.Out(percent=100, steps=100)
'''
@param_to_property(in_or_out=["In", "Out"])
def _pinch(in_or_out="Out", percent=100, steps=50):
if in_or_out in ["Out", "out"]:
return self.jsonrpc.pinchOut(self.selector, percent, steps)
elif in_or_out in ["In", "in"]:
return self.jsonrpc.pinchIn(self.selector, percent, steps)
return _pinch
@property
def swipe(self):
'''
Perform swipe action. if device platform greater than API 18, percent can be used and value between 0 and 1
Usages:
d().swipe.right()
d().swipe.left(steps=10)
d().swipe.up(steps=10)
d().swipe.down()
d().swipe("right", steps=20)
d().swipe("right", steps=20, percent=0.5)
'''
@param_to_property(direction=["up", "down", "right", "left"])
def _swipe(direction="left", steps=10, percent=1):
if percent == 1:
return self.jsonrpc.swipe(self.selector, direction, steps)
else:
return self.jsonrpc.swipe(self.selector, direction, percent, steps)
return _swipe
@property
def wait(self):
'''
Wait until the ui object gone or exist.
Usage:
d(text="Clock").wait.gone() # wait until it's gone.
d(text="Settings").wait.exists() # wait until it appears.
'''
@param_to_property(action=["exists", "gone"])
def _wait(action, timeout=3000):
if timeout / 1000 + 5 > int(os.environ.get("JSONRPC_TIMEOUT", 90)):
http_timeout = timeout / 1000 + 5
else:
http_timeout = int(os.environ.get("JSONRPC_TIMEOUT", 90))
method = self.device.server.jsonrpc_wrap(
timeout=http_timeout
).waitUntilGone if action == "gone" else self.device.server.jsonrpc_wrap(timeout=http_timeout).waitForExists
return method(self.selector, timeout)
return _wait
class AutomatorDeviceNamedUiObject(AutomatorDeviceUiObject):
def __init__(self, device, name):
super(AutomatorDeviceNamedUiObject, self).__init__(device, name)
def child(self, **kwargs):
return AutomatorDeviceNamedUiObject(
self.device,
self.jsonrpc.getChild(self.selector, Selector(**kwargs))
)
def sibling(self, **kwargs):
return AutomatorDeviceNamedUiObject(
self.device,
self.jsonrpc.getFromParent(self.selector, Selector(**kwargs))
)
class AutomatorDeviceObject(AutomatorDeviceUiObject):
'''Represent a generic UiObject/UiScrollable/UiCollection,
on which user can perform actions, such as click, set text
'''
def __init__(self, device, selector):
super(AutomatorDeviceObject, self).__init__(device, selector)
def child(self, **kwargs):
'''set childSelector.'''
return AutomatorDeviceObject(
self.device,
self.selector.clone().child(**kwargs)
)
def sibling(self, **kwargs):
'''set fromParent selector.'''
return AutomatorDeviceObject(
self.device,
self.selector.clone().sibling(**kwargs)
)
child_selector, from_parent = child, sibling
def child_by_text(self, txt, **kwargs):
if "allow_scroll_search" in kwargs:
allow_scroll_search = kwargs.pop("allow_scroll_search")
name = self.jsonrpc.childByText(
self.selector,
Selector(**kwargs),
txt,
allow_scroll_search
)
else:
name = self.jsonrpc.childByText(
self.selector,
Selector(**kwargs),
txt
)
return AutomatorDeviceNamedUiObject(self.device, name)
def child_by_description(self, txt, **kwargs):
if "allow_scroll_search" in kwargs:
allow_scroll_search = kwargs.pop("allow_scroll_search")
name = self.jsonrpc.childByDescription(
self.selector,
Selector(**kwargs),
txt,
allow_scroll_search
)
else:
name = self.jsonrpc.childByDescription(
self.selector,
Selector(**kwargs),
txt
)
return AutomatorDeviceNamedUiObject(self.device, name)
def child_by_instance(self, inst, **kwargs):
return AutomatorDeviceNamedUiObject(
self.device,
self.jsonrpc.childByInstance(self.selector, Selector(**kwargs), inst)
)
@property
def count(self):
return self.jsonrpc.count(self.selector)
def __len__(self):
return self.count
def __getitem__(self, index):
count = self.count
if index >= count:
raise IndexError()
elif count == 1:
return self
else:
selector = self.selector.clone()
selector["instance"] = index
return AutomatorDeviceObject(self.device, selector)
def __iter__(self):
obj, length = self, self.count
class Iter(object):
def __init__(self):
self.index = -1
def next(self):
self.index += 1
if self.index < length:
return obj[self.index]
else:
raise StopIteration()
__next__ = next
return Iter()
def right(self, **kwargs):
def onrightof(rect1, rect2):
left, top, right, bottom = intersect(rect1, rect2)
return rect2["left"] - rect1["right"] if top < bottom else -1
return self.__view_beside(onrightof, **kwargs)
def left(self, **kwargs):
def onleftof(rect1, rect2):
left, top, right, bottom = intersect(rect1, rect2)
return rect1["left"] - rect2["right"] if top < bottom else -1
return self.__view_beside(onleftof, **kwargs)
def up(self, **kwargs):
def above(rect1, rect2):
left, top, right, bottom = intersect(rect1, rect2)
return rect1["top"] - rect2["bottom"] if left < right else -1
return self.__view_beside(above, **kwargs)
def down(self, **kwargs):
def under(rect1, rect2):
left, top, right, bottom = intersect(rect1, rect2)
return rect2["top"] - rect1["bottom"] if left < right else -1
return self.__view_beside(under, **kwargs)
def __view_beside(self, onsideof, **kwargs):
bounds = self.info["bounds"]
min_dist, found = -1, None
for ui in AutomatorDeviceObject(self.device, Selector(**kwargs)):
dist = onsideof(bounds, ui.info["bounds"])
if dist >= 0 and (min_dist < 0 or dist < min_dist):
min_dist, found = dist, ui
return found
@property
def fling(self):
'''
Perform fling action.
Usage:
d().fling() # default vertically, forward
d().fling.horiz.forward()
d().fling.vert.backward()
d().fling.toBeginning(max_swipes=100) # vertically
d().fling.horiz.toEnd()
'''
@param_to_property(
dimention=["vert", "vertically", "vertical", "horiz", "horizental", "horizentally"],
action=["forward", "backward", "toBeginning", "toEnd"]
)
def _fling(dimention="vert", action="forward", max_swipes=1000):
vertical = dimention in ["vert", "vertically", "vertical"]
if action == "forward":
return self.jsonrpc.flingForward(self.selector, vertical)
elif action == "backward":
return self.jsonrpc.flingBackward(self.selector, vertical)
elif action == "toBeginning":
return self.jsonrpc.flingToBeginning(self.selector, vertical, max_swipes)
elif action == "toEnd":
return self.jsonrpc.flingToEnd(self.selector, vertical, max_swipes)
return _fling
@property
def scroll(self):
'''
Perfrom scroll action.
Usage:
d().scroll(steps=50) # default vertically and forward
d().scroll.horiz.forward(steps=100)
d().scroll.vert.backward(steps=100)
d().scroll.horiz.toBeginning(steps=100, max_swipes=100)
d().scroll.vert.toEnd(steps=100)
d().scroll.horiz.to(text="Clock")
'''
def __scroll(vertical, forward, steps=100):
method = self.jsonrpc.scrollForward if forward else self.jsonrpc.scrollBackward
return method(self.selector, vertical, steps)
def __scroll_to_beginning(vertical, steps=100, max_swipes=1000):
return self.jsonrpc.scrollToBeginning(self.selector, vertical, max_swipes, steps)
def __scroll_to_end(vertical, steps=100, max_swipes=1000):
return self.jsonrpc.scrollToEnd(self.selector, vertical, max_swipes, steps)
def __scroll_to(vertical, **kwargs):
return self.jsonrpc.scrollTo(self.selector, Selector(**kwargs), vertical)
@param_to_property(
dimention=["vert", "vertically", "vertical", "horiz", "horizental", "horizentally"],
action=["forward", "backward", "toBeginning", "toEnd", "to"])
def _scroll(dimention="vert", action="forward", **kwargs):
vertical = dimention in ["vert", "vertically", "vertical"]
if action in ["forward", "backward"]:
return __scroll(vertical, action == "forward", **kwargs)
elif action == "toBeginning":
return __scroll_to_beginning(vertical, **kwargs)
elif action == "toEnd":
return __scroll_to_end(vertical, **kwargs)
elif action == "to":
return __scroll_to(vertical, **kwargs)
return _scroll
device = AutomatorDevice()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment