Created
August 30, 2015 14:32
-
-
Save tito/a1d3f0b1f308fed88cc7 to your computer and use it in GitHub Desktop.
Kivy Automation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
""" | |
Automation client for automatesocket.py | |
======================================= | |
""" | |
import socket | |
import struct | |
import json | |
import base64 | |
import sys | |
from time import time, sleep | |
_connect_info = ("localhost", 7777) | |
def _dprint(msg): | |
sys.stdout.write(msg) | |
sys.stdout.flush() | |
def _recvall(sock, size): | |
msg = "" | |
while len(msg) < size: | |
packet = sock.recv(size - len(msg)) | |
if not packet: | |
return None | |
msg += packet | |
return msg | |
def interact(*args): | |
"""Manually send a command to the connected socket | |
""" | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.connect(_connect_info) | |
try: | |
msg = json.dumps(args) | |
size = len(msg) | |
sock.sendall(struct.pack("I", size)) | |
sock.sendall(msg) | |
msgsize = _recvall(sock, struct.calcsize("I")) | |
if not msgsize: | |
return None | |
size = struct.unpack("I", msgsize)[0] | |
msgdata = _recvall(sock, size) | |
return json.loads(msgdata) | |
finally: | |
sock.close() | |
def connect(host="localhost", port=7777, timeout=10): | |
global _connect_info | |
_dprint("Connecting to {}:{}... ".format(host, port)) | |
_connect_info = (host, port) | |
start = time() | |
while True: | |
try: | |
if interact("ping") is True: | |
_dprint("ok\n") | |
return | |
if time() - start > timeout: | |
_dprint("timeout\n", fatal=True) | |
raise Exception("Connection timeout") | |
except: | |
sleep(.1) | |
def wait(cond, timeout=-1): | |
_dprint("Wait condition '{}'... ".format(cond)) | |
ret = interact("wait", cond, timeout) | |
_dprint("ok (result={})\n".format(ret)) | |
return ret | |
def screenshot(): | |
_dprint("Screenshot... ") | |
width, height, data = interact("screenshot") | |
data = base64.decodestring(data) | |
with open("screenshot.png", "wb") as fd: | |
fd.write(data) | |
_dprint("ok\n") | |
def quit(): | |
_dprint("Quit the application... ") | |
interact("quit") | |
_dprint("ok\n") | |
def execute(cmd): | |
return interact("execute", cmd) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
""" | |
Automation server | |
================= | |
Right now, just import this file from your main.py, if you want to use the automateclient.py. | |
No configuration needed. | |
""" | |
import socket | |
import struct | |
import sys | |
import threading | |
import json | |
import base64 | |
import os | |
from time import time, sleep | |
COMMAND_SIZE = struct.calcsize("I") | |
COMMANDS = {} | |
def command(f): | |
COMMANDS[f.__name__] = f | |
@command | |
def screenshot(*args): | |
"""Do a screenshot of the window, and return it as a PNG: | |
(width, height, "RGBA", base64-data) | |
""" | |
from kivy.clock import mainthread | |
_result = [] | |
_event = threading.Event() | |
@mainthread | |
def _real_screenshot(): | |
import tempfile | |
from kivy.core.window import Window | |
try: | |
fd = tempfile.NamedTemporaryFile(suffix=".png", delete=False) | |
# XXX hack, Window.screenshot doesn't respect our filename | |
screenshot_fn = fd.name.rsplit(".")[-2] + "0001.png" | |
fd.close() | |
width, height = Window.size | |
Window.screenshot(name=fd.name) | |
with open(screenshot_fn, "rb") as fd: | |
data = fd.read() | |
_result[:] = [width, height, base64.encodestring(data)] | |
except: | |
import traceback | |
traceback.print_exc() | |
finally: | |
try: | |
os.unlink(screenshot_fn) | |
os.unlink(fd.name) | |
except: | |
pass | |
_event.set() | |
_real_screenshot() | |
_event.wait() | |
return _result | |
@command | |
def execute(cmd): | |
"""Execute a random string in the app context | |
""" | |
from kivy.clock import mainthread | |
_result = [] | |
_event = threading.Event() | |
@mainthread | |
def _real_execute(): | |
from kivy.app import App | |
app = App.get_running_app() | |
idmap = {"app": app} | |
exec cmd in idmap, idmap | |
_event.set() | |
_event.wait() | |
return _result | |
@command | |
def wait(cond, timeout=-1): | |
"""Wait a condition unless timeout happen | |
""" | |
from kivy.app import App | |
from kivy.clock import Clock | |
_result = [None] | |
_event = threading.Event() | |
_start = time() | |
def _real_wait(*args): | |
try: | |
app = App.get_running_app() | |
idmap = {"app": app} | |
if eval(cond, idmap): | |
_result[:] = [True] | |
_event.set() | |
return False | |
except: | |
pass | |
if timeout == -1: | |
return | |
if time() - _start > timeout: | |
_result[:] = [False] | |
_event.set() | |
return False | |
Clock.schedule_interval(_real_wait, .1) | |
_event_timeout = None if timeout == -1 else timeout | |
try: | |
_event.wait(_event_timeout) | |
except: | |
pass | |
Clock.unschedule(_real_wait) | |
return _result[0] | |
@command | |
def ping(): | |
return True | |
@command | |
def quit(): | |
from kivy.app import App | |
from kivy.clock import Clock | |
def _real_quit(*args): | |
App.get_running_app().stop() | |
Clock.schedule_once(_real_quit, 0) | |
return True | |
def recvall(sock, size): | |
msg = "" | |
while len(msg) < size: | |
print size, len(msg) | |
packet = sock.recv(size - len(msg)) | |
if not packet: | |
return None | |
msg += packet | |
return msg | |
def run_debug_socket(): | |
netloc = ('', 7777) | |
servsock = socket.socket() | |
servsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) | |
servsock.bind(netloc) | |
servsock.listen(5) | |
while True: | |
sock, _ = servsock.accept() | |
# size of the command | |
try: | |
size = struct.unpack("I", recvall(sock, struct.calcsize("I")))[0] | |
data = recvall(sock, size) | |
args = json.loads(data) | |
cmd, args = args[0], args[1:] | |
result = COMMANDS[cmd](*args) | |
data = json.dumps(result) | |
sock.sendall(struct.pack("I", len(data))) | |
sock.sendall(data) | |
except: | |
import traceback | |
traceback.print_exc() | |
finally: | |
sock.close() | |
debug_thread = threading.Thread(target=run_debug_socket) | |
debug_thread.daemon = True | |
debug_thread.start() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from automateclient import connect, wait, screenshot, quit | |
connect() | |
wait("app.screen_manager.current == 'welcome'") | |
wait("app.screen_manager.current_screen.transition_progress == 1") | |
screenshot() | |
quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment