Skip to content

Instantly share code, notes, and snippets.

@tito
Created August 30, 2015 14:32
Show Gist options
  • Save tito/a1d3f0b1f308fed88cc7 to your computer and use it in GitHub Desktop.
Save tito/a1d3f0b1f308fed88cc7 to your computer and use it in GitHub Desktop.
Kivy Automation
# coding=utf-8
"""
Automation client for automatesocket.py
=======================================
"""
import socket
import struct
import json
import base64
import sys
from time import time, sleep
_connect_info = ("localhost", 7777)
def _dprint(msg):
sys.stdout.write(msg)
sys.stdout.flush()
def _recvall(sock, size):
msg = ""
while len(msg) < size:
packet = sock.recv(size - len(msg))
if not packet:
return None
msg += packet
return msg
def interact(*args):
"""Manually send a command to the connected socket
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(_connect_info)
try:
msg = json.dumps(args)
size = len(msg)
sock.sendall(struct.pack("I", size))
sock.sendall(msg)
msgsize = _recvall(sock, struct.calcsize("I"))
if not msgsize:
return None
size = struct.unpack("I", msgsize)[0]
msgdata = _recvall(sock, size)
return json.loads(msgdata)
finally:
sock.close()
def connect(host="localhost", port=7777, timeout=10):
global _connect_info
_dprint("Connecting to {}:{}... ".format(host, port))
_connect_info = (host, port)
start = time()
while True:
try:
if interact("ping") is True:
_dprint("ok\n")
return
if time() - start > timeout:
_dprint("timeout\n", fatal=True)
raise Exception("Connection timeout")
except:
sleep(.1)
def wait(cond, timeout=-1):
_dprint("Wait condition '{}'... ".format(cond))
ret = interact("wait", cond, timeout)
_dprint("ok (result={})\n".format(ret))
return ret
def screenshot():
_dprint("Screenshot... ")
width, height, data = interact("screenshot")
data = base64.decodestring(data)
with open("screenshot.png", "wb") as fd:
fd.write(data)
_dprint("ok\n")
def quit():
_dprint("Quit the application... ")
interact("quit")
_dprint("ok\n")
def execute(cmd):
return interact("execute", cmd)
# coding=utf-8
"""
Automation server
=================
Right now, just import this file from your main.py, if you want to use the automateclient.py.
No configuration needed.
"""
import socket
import struct
import sys
import threading
import json
import base64
import os
from time import time, sleep
COMMAND_SIZE = struct.calcsize("I")
COMMANDS = {}
def command(f):
COMMANDS[f.__name__] = f
@command
def screenshot(*args):
"""Do a screenshot of the window, and return it as a PNG:
(width, height, "RGBA", base64-data)
"""
from kivy.clock import mainthread
_result = []
_event = threading.Event()
@mainthread
def _real_screenshot():
import tempfile
from kivy.core.window import Window
try:
fd = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
# XXX hack, Window.screenshot doesn't respect our filename
screenshot_fn = fd.name.rsplit(".")[-2] + "0001.png"
fd.close()
width, height = Window.size
Window.screenshot(name=fd.name)
with open(screenshot_fn, "rb") as fd:
data = fd.read()
_result[:] = [width, height, base64.encodestring(data)]
except:
import traceback
traceback.print_exc()
finally:
try:
os.unlink(screenshot_fn)
os.unlink(fd.name)
except:
pass
_event.set()
_real_screenshot()
_event.wait()
return _result
@command
def execute(cmd):
"""Execute a random string in the app context
"""
from kivy.clock import mainthread
_result = []
_event = threading.Event()
@mainthread
def _real_execute():
from kivy.app import App
app = App.get_running_app()
idmap = {"app": app}
exec cmd in idmap, idmap
_event.set()
_event.wait()
return _result
@command
def wait(cond, timeout=-1):
"""Wait a condition unless timeout happen
"""
from kivy.app import App
from kivy.clock import Clock
_result = [None]
_event = threading.Event()
_start = time()
def _real_wait(*args):
try:
app = App.get_running_app()
idmap = {"app": app}
if eval(cond, idmap):
_result[:] = [True]
_event.set()
return False
except:
pass
if timeout == -1:
return
if time() - _start > timeout:
_result[:] = [False]
_event.set()
return False
Clock.schedule_interval(_real_wait, .1)
_event_timeout = None if timeout == -1 else timeout
try:
_event.wait(_event_timeout)
except:
pass
Clock.unschedule(_real_wait)
return _result[0]
@command
def ping():
return True
@command
def quit():
from kivy.app import App
from kivy.clock import Clock
def _real_quit(*args):
App.get_running_app().stop()
Clock.schedule_once(_real_quit, 0)
return True
def recvall(sock, size):
msg = ""
while len(msg) < size:
print size, len(msg)
packet = sock.recv(size - len(msg))
if not packet:
return None
msg += packet
return msg
def run_debug_socket():
netloc = ('', 7777)
servsock = socket.socket()
servsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
servsock.bind(netloc)
servsock.listen(5)
while True:
sock, _ = servsock.accept()
# size of the command
try:
size = struct.unpack("I", recvall(sock, struct.calcsize("I")))[0]
data = recvall(sock, size)
args = json.loads(data)
cmd, args = args[0], args[1:]
result = COMMANDS[cmd](*args)
data = json.dumps(result)
sock.sendall(struct.pack("I", len(data)))
sock.sendall(data)
except:
import traceback
traceback.print_exc()
finally:
sock.close()
debug_thread = threading.Thread(target=run_debug_socket)
debug_thread.daemon = True
debug_thread.start()
from automateclient import connect, wait, screenshot, quit
connect()
wait("app.screen_manager.current == 'welcome'")
wait("app.screen_manager.current_screen.transition_progress == 1")
screenshot()
quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment