Skip to content

Instantly share code, notes, and snippets.

@gary23w
Last active December 21, 2022 22:49
Show Gist options
  • Save gary23w/2c188f753a9f6a0a6a2a869232aa8348 to your computer and use it in GitHub Desktop.
Save gary23w/2c188f753a9f6a0a6a2a869232aa8348 to your computer and use it in GitHub Desktop.
keypresses and window focus
## an attempt to listen to keypresses and monitor window focus
import time
import random
import re
import sys
import enum
import six
from six.moves import queue
import unicodedata
import pyrect
import ctypes
import collections
import contextlib
import functools
import importlib
import itertools
import threading
from ctypes import (
windll,
wintypes)
class AbstractListener(threading.Thread):
class StopException(Exception):
pass
_HANDLED_EXCEPTIONS = tuple()
def __init__(self, suppress=False, **kwargs):
super(AbstractListener, self).__init__()
def wrapper(f):
def inner(*args):
if f(*args) is False:
raise self.StopException()
return inner
self._suppress = suppress
self._running = False
self._thread = threading.current_thread()
self._condition = threading.Condition()
self._ready = False
self._queue = queue.Queue(10)
self.daemon = True
for name, callback in kwargs.items():
setattr(self, name, wrapper(callback or (lambda *a: None)))
@property
def suppress(self):
return self._suppress
@property
def running(self):
return self._running
def stop(self):
if self._running:
self._running = False
self._queue.put(None)
self._stop_platform()
def __enter__(self):
self.start()
self.wait()
return self
def __exit__(self, exc_type, value, traceback):
self.stop()
def wait(self):
self._condition.acquire()
while not self._ready:
self._condition.wait()
self._condition.release()
def run(self):
self._running = True
self._thread = threading.current_thread()
self._run()
self._queue.put(None)
@classmethod
def _emitter(cls, f):
@functools.wraps(f)
def inner(self, *args, **kwargs):
try:
return f(self, *args, **kwargs)
except Exception as e:
if not isinstance(e, self._HANDLED_EXCEPTIONS):
if not isinstance(e, AbstractListener.StopException):
self._log.exception(
'Unhandled exception in listener callback')
self._queue.put(
None if isinstance(e, cls.StopException)
else sys.exc_info())
self.stop()
raise
# pylint: enable=W0702
return inner
def _mark_ready(self):
self._condition.acquire()
self._ready = True
self._condition.notify()
self._condition.release()
def _run(self):
raise NotImplementedError()
def _stop_platform(self):
raise NotImplementedError()
def join(self, *args):
super(AbstractListener, self).join(*args)
try:
exc_type, exc_value, exc_traceback = self._queue.get()
except TypeError:
return
six.reraise(exc_type, exc_value, exc_traceback)
def prefix(base, cls):
for super_cls in filter(
lambda cls: issubclass(cls, base),
cls.__mro__[1:]):
if super_cls is base:
return cls.__module__.rsplit('.', 1)[-1][1:] + '_'
else:
result = prefix(base, super_cls)
if result is not None:
return result
class BKeyCode(object):
_PLATFORM_EXTENSIONS = []
def __init__(self, vk=None, char=None, is_dead=False, **kwargs):
self.vk = vk
self.char = six.text_type(char) if char is not None else None
self.is_dead = is_dead
if self.is_dead:
try:
self.combining = unicodedata.lookup(
'COMBINING ' + unicodedata.name(self.char))
except KeyError:
self.is_dead = False
self.combining = None
if self.is_dead and not self.combining:
raise KeyError(char)
else:
self.combining = None
for key in self._PLATFORM_EXTENSIONS:
setattr(self, key, kwargs.pop(key, None))
if kwargs:
raise ValueError(kwargs)
def __repr__(self):
if self.is_dead:
return '[%s]' % repr(self.char)
if self.char is not None:
return repr(self.char)
else:
return '<%d>' % self.vk
def __str__(self):
return repr(self)
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if self.char is not None and other.char is not None:
return self.char == other.char and self.is_dead == other.is_dead
else:
return self.vk == other.vk and all(
getattr(self, f) == getattr(other, f)
for f in self._PLATFORM_EXTENSIONS)
def __hash__(self):
return hash(repr(self))
def join(self, key):
if not self.is_dead:
raise ValueError(self)
if key.char == ' ' or self == key:
return self.from_char(self.char)
if key.char is not None:
combined = unicodedata.normalize(
'NFC',
key.char + self.combining)
if combined:
return self.from_char(combined)
raise ValueError(key)
@classmethod
def from_vk(cls, vk, **kwargs):
return cls(vk=vk, **kwargs)
@classmethod
def from_char(cls, char, **kwargs):
return cls(char=char, **kwargs)
@classmethod
def from_dead(cls, char, **kwargs):
return cls(char=char, is_dead=True, **kwargs)
class BKey(enum.Enum):
alt = 0
alt_l = 0
alt_r = 0
alt_gr = 0
backspace = 0
caps_lock = 0
cmd = 0
cmd_l = 0
cmd_r = 0
ctrl = 0
ctrl_l = 0
ctrl_r = 0
delete = 0
down = 0
end = 0
enter = 0
esc = 0
f1 = 0
f2 = 0
f3 = 0
f4 = 0
f5 = 0
f6 = 0
f7 = 0
f8 = 0
f9 = 0
f10 = 0
f11 = 0
f12 = 0
f13 = 0
f14 = 0
f15 = 0
f16 = 0
f17 = 0
f18 = 0
f19 = 0
f20 = 0
home = 0
left = 0
page_down = 0
page_up = 0
right = 0
shift = 0
shift_l = 0
shift_r = 0
space = 0
tab = 0
up = 0
media_play_pause = 0
media_volume_mute = 0
media_volume_down = 0
media_volume_up = 0
media_previous = 0
media_next = 0
insert = 0
menu = 0
num_lock = 0
pause = 0
print_screen = 0
scroll_lock = 0
class BController(object):
_KeyCode = BKeyCode
_Key = BKey
class InvalidKeyException(Exception):
"""An exception raised when an invalid key is used."""
pass
class InvalidCharacterException(Exception):
"""An exception raised when an invalid character is used."""
pass
def __init__(self):
self._modifiers_lock = threading.RLock()
self._modifiers = set()
self._caps_lock = False
self._dead_key = None
def press(self, key):
resolved = self._resolve(key)
if resolved is None:
raise self.InvalidKeyException(key)
self._update_modifiers(resolved, True)
if resolved == self._Key.caps_lock.value:
self._caps_lock = not self._caps_lock
original = resolved
if self._dead_key:
try:
resolved = self._dead_key.join(resolved)
except ValueError:
self._handle(self._dead_key, True)
self._handle(self._dead_key, False)
if resolved.is_dead:
self._dead_key = resolved
return
try:
self._handle(resolved, True)
except self.InvalidKeyException:
if resolved != original:
self._handle(self._dead_key, True)
self._handle(self._dead_key, False)
self._handle(original, True)
self._dead_key = None
def release(self, key):
resolved = self._resolve(key)
if resolved is None:
raise self.InvalidKeyException(key)
self._update_modifiers(resolved, False)
# Ignore released dead keys
if resolved.is_dead:
return
self._handle(resolved, False)
def tap(self, key):
self.press(key)
self.release(key)
def touch(self, key, is_press):
if is_press:
self.press(key)
else:
self.release(key)
@contextlib.contextmanager
def pressed(self, *args):
for key in args:
self.press(key)
try:
yield
finally:
for key in reversed(args):
self.release(key)
def type(self, string):
from . import _CONTROL_CODES
for i, character in enumerate(string):
key = _CONTROL_CODES.get(character, character)
try:
self.press(key)
self.release(key)
except (ValueError, self.InvalidKeyException):
raise self.InvalidCharacterException(i, character)
@property
@contextlib.contextmanager
def modifiers(self):
with self._modifiers_lock:
yield set(
self._as_modifier(modifier)
for modifier in self._modifiers)
@property
def alt_pressed(self):
with self.modifiers as modifiers:
return self._Key.alt in modifiers
@property
def alt_gr_pressed(self):
with self.modifiers as modifiers:
return self._Key.alt_gr in modifiers
@property
def ctrl_pressed(self):
with self.modifiers as modifiers:
return self._Key.ctrl in modifiers
@property
def shift_pressed(self):
if self._caps_lock:
return True
with self.modifiers as modifiers:
return self._Key.shift in modifiers
def _resolve(self, key):
if key in (k for k in self._Key):
return key.value
if isinstance(key, six.string_types):
if len(key) != 1:
raise ValueError(key)
return self._KeyCode.from_char(key)
if isinstance(key, self._KeyCode):
if key.char is not None and self.shift_pressed:
return self._KeyCode(vk=key.vk, char=key.char.upper())
else:
return key
def _update_modifiers(self, key, is_press):
if self._as_modifier(key):
with self._modifiers_lock:
if is_press:
self._modifiers.add(key)
else:
try:
self._modifiers.remove(key)
except KeyError:
pass
def _as_modifier(self, key):
from . import _NORMAL_MODIFIERS
return _NORMAL_MODIFIERS.get(key, None)
def _handle(self, key, is_press):
raise NotImplementedError()
# pylint: disable=W0223; This is also an abstract class
class BListener(AbstractListener):
def __init__(self, on_press=None, on_release=None, suppress=False,
**kwargs):
option_prefix = prefix(Listener, self.__class__)
self._options = {
key[len(option_prefix):]: value
for key, value in kwargs.items()
if key.startswith(option_prefix)}
super(Listener, self).__init__(
on_press=on_press, on_release=on_release, suppress=suppress)
def canonical(self, key):
from pynput.keyboard import Key, KeyCode, _NORMAL_MODIFIERS
if isinstance(key, KeyCode) and key.char is not None:
return KeyCode.from_char(key.char.lower())
elif isinstance(key, Key) and key.value in _NORMAL_MODIFIERS:
return _NORMAL_MODIFIERS[key.value]
elif isinstance(key, Key) and key.value.vk is not None:
return KeyCode.from_vk(key.value.vk)
else:
return key
##keys
LBUTTON = 1
RBUTTON = 2
CANCEL = 3
MBUTTON = 4
XBUTTON1 = 5
XBUTTON2 = 6
BACK = 8
TAB = 9
CLEAR = 12
RETURN = 13
SHIFT = 16
CONTROL = 17
MENU = 18
PAUSE = 19
CAPITAL = 20
KANA = 21
HANGEUL = 21
HANGUL = 21
JUNJA = 23
FINAL = 24
HANJA = 25
KANJI = 25
ESCAPE = 27
CONVERT = 28
NONCONVERT = 29
ACCEPT = 30
MODECHANGE = 31
SPACE = 32
PRIOR = 33
NEXT = 34
END = 35
HOME = 36
LEFT = 37
UP = 38
RIGHT = 39
DOWN = 40
SELECT = 41
PRINT = 42
EXECUTE = 43
SNAPSHOT = 44
INSERT = 45
DELETE = 46
HELP = 47
LWIN = 91
RWIN = 92
APPS = 93
SLEEP = 95
NUMPAD0 = 96
NUMPAD1 = 97
NUMPAD2 = 98
NUMPAD3 = 99
NUMPAD4 = 100
NUMPAD5 = 101
NUMPAD6 = 102
NUMPAD7 = 103
NUMPAD8 = 104
NUMPAD9 = 105
MULTIPLY = 106
ADD = 107
SEPARATOR = 108
SUBTRACT = 109
DECIMAL = 110
DIVIDE = 111
F1 = 112
F2 = 113
F3 = 114
F4 = 115
F5 = 116
F6 = 117
F7 = 118
F8 = 119
F9 = 120
F10 = 121
F11 = 122
F12 = 123
F13 = 124
F14 = 125
F15 = 126
F16 = 127
F17 = 128
F18 = 129
F19 = 130
F20 = 131
F21 = 132
F22 = 133
F23 = 134
F24 = 135
NUMLOCK = 144
SCROLL = 145
OEM_NEC_EQUAL = 146
OEM_FJ_JISHO = 146
OEM_FJ_MASSHOU = 147
OEM_FJ_TOUROKU = 148
OEM_FJ_LOYA = 149
OEM_FJ_ROYA = 150
LSHIFT = 160
RSHIFT = 161
LCONTROL = 162
RCONTROL = 163
LMENU = 164
RMENU = 165
BROWSER_BACK = 166
BROWSER_FORWARD = 167
BROWSER_REFRESH = 168
BROWSER_STOP = 169
BROWSER_SEARCH = 170
BROWSER_FAVORITES = 171
BROWSER_HOME = 172
VOLUME_MUTE = 173
VOLUME_DOWN = 174
VOLUME_UP = 175
MEDIA_NEXT_TRACK = 176
MEDIA_PREV_TRACK = 177
MEDIA_STOP = 178
MEDIA_PLAY_PAUSE = 179
LAUNCH_MAIL = 180
LAUNCH_MEDIA_SELECT = 181
LAUNCH_APP1 = 182
LAUNCH_APP2 = 183
OEM_1 = 186
OEM_PLUS = 187
OEM_COMMA = 188
OEM_MINUS = 189
OEM_PERIOD = 190
OEM_2 = 191
OEM_3 = 192
OEM_4 = 219
OEM_5 = 220
OEM_6 = 221
OEM_7 = 222
OEM_8 = 223
OEM_AX = 225
OEM_102 = 226
ICO_HELP = 227
ICO_00 = 228
PROCESSKEY = 229
ICO_CLEAR = 230
PACKET = 231
OEM_RESET = 233
OEM_JUMP = 234
OEM_PA1 = 235
OEM_PA2 = 236
OEM_PA3 = 237
OEM_WSCTRL = 238
OEM_CUSEL = 239
OEM_ATTN = 240
OEM_FINISH = 241
OEM_COPY = 242
OEM_AUTO = 243
OEM_ENLW = 244
OEM_BACKTAB = 245
ATTN = 246
CRSEL = 247
EXSEL = 248
EREOF = 249
PLAY = 250
ZOOM = 251
NONAME = 252
PA1 = 253
OEM_CLEAR = 254
class MOUSEINPUT(ctypes.Structure):
MOVE = 0x0001
LEFTDOWN = 0x0002
LEFTUP = 0x0004
RIGHTDOWN = 0x0008
RIGHTUP = 0x0010
MIDDLEDOWN = 0x0020
MIDDLEUP = 0x0040
XDOWN = 0x0080
XUP = 0x0100
WHEEL = 0x0800
HWHEEL = 0x1000
ABSOLUTE = 0x8000
XBUTTON1 = 0x0001
XBUTTON2 = 0x0002
_fields_ = [
('dx', wintypes.LONG),
('dy', wintypes.LONG),
('mouseData', wintypes.DWORD),
('dwFlags', wintypes.DWORD),
('time', wintypes.DWORD),
('dwExtraInfo', ctypes.c_void_p)]
class KEYBDINPUT(ctypes.Structure):
EXTENDEDKEY = 0x0001
KEYUP = 0x0002
SCANCODE = 0x0008
UNICODE = 0x0004
_fields_ = [
('wVk', wintypes.WORD),
('wScan', wintypes.WORD),
('dwFlags', wintypes.DWORD),
('time', wintypes.DWORD),
('dwExtraInfo', ctypes.c_void_p)]
class HARDWAREINPUT(ctypes.Structure):
_fields_ = [
('uMsg', wintypes.DWORD),
('wParamL', wintypes.WORD),
('wParamH', wintypes.WORD)]
class INPUT_union(ctypes.Union):
_fields_ = [
('mi', MOUSEINPUT),
('ki', KEYBDINPUT),
('hi', HARDWAREINPUT)]
class INPUT(ctypes.Structure):
MOUSE = 0
KEYBOARD = 1
HARDWARE = 2
_fields_ = [
('type', wintypes.DWORD),
('value', INPUT_union)]
LPINPUT = ctypes.POINTER(INPUT)
VkKeyScan = windll.user32.VkKeyScanW
VkKeyScan.argtypes = (
wintypes.WCHAR,)
MapVirtualKey = windll.user32.MapVirtualKeyW
MapVirtualKey.argtypes = (
wintypes.UINT,
wintypes.UINT)
MapVirtualKey.MAPVK_VK_TO_VSC = 0
SendInput = windll.user32.SendInput
SendInput.argtypes = (
wintypes.UINT,
ctypes.c_voidp, # Really LPINPUT
ctypes.c_int)
GetCurrentThreadId = windll.kernel32.GetCurrentThreadId
GetCurrentThreadId.restype = wintypes.DWORD
class MessageLoop(object):
WM_STOP = 0x0401
_LPMSG = ctypes.POINTER(wintypes.MSG)
_GetMessage = windll.user32.GetMessageW
_GetMessage.argtypes = (
ctypes.c_voidp,
wintypes.HWND,
wintypes.UINT,
wintypes.UINT)
_PeekMessage = windll.user32.PeekMessageW
_PeekMessage.argtypes = (
ctypes.c_voidp,
wintypes.HWND,
wintypes.UINT,
wintypes.UINT,
wintypes.UINT)
_PostThreadMessage = windll.user32.PostThreadMessageW
_PostThreadMessage.argtypes = (
wintypes.DWORD,
wintypes.UINT,
wintypes.WPARAM,
wintypes.LPARAM)
PM_NOREMOVE = 0
def __init__(self):
self._threadid = None
self._event = threading.Event()
self.thread = None
def __iter__(self):
assert self._threadid is not None
try:
while True:
msg = wintypes.MSG()
lpmsg = ctypes.byref(msg)
r = self._GetMessage(lpmsg, None, 0, 0)
if r <= 0 or msg.message == self.WM_STOP:
break
else:
yield msg
finally:
self._threadid = None
self.thread = None
def start(self):
self._threadid = GetCurrentThreadId()
self.thread = threading.current_thread()
msg = wintypes.MSG()
lpmsg = ctypes.byref(msg)
self._PeekMessage(lpmsg, None, 0x0400, 0x0400, self.PM_NOREMOVE)
self._event.set()
def stop(self):
self._event.wait()
if self._threadid:
self.post(self.WM_STOP, 0, 0)
def post(self, msg, wparam, lparam):
self._PostThreadMessage(self._threadid, msg, wparam, lparam)
class SystemHook(object):
HC_ACTION = 0
_HOOKPROC = ctypes.WINFUNCTYPE(
wintypes.LPARAM,
ctypes.c_int32, wintypes.WPARAM, wintypes.LPARAM)
_SetWindowsHookEx = windll.user32.SetWindowsHookExW
_SetWindowsHookEx.argtypes = (
ctypes.c_int,
_HOOKPROC,
wintypes.HINSTANCE,
wintypes.DWORD)
_UnhookWindowsHookEx = windll.user32.UnhookWindowsHookEx
_UnhookWindowsHookEx.argtypes = (
wintypes.HHOOK,)
_CallNextHookEx = windll.user32.CallNextHookEx
_CallNextHookEx.argtypes = (
wintypes.HHOOK,
ctypes.c_int,
wintypes.WPARAM,
wintypes.LPARAM)
#: The registered hook procedures
_HOOKS = {}
class SuppressException(Exception):
pass
def __init__(self, hook_id, on_hook=lambda code, msg, lpdata: None):
self.hook_id = hook_id
self.on_hook = on_hook
self._hook = None
def __enter__(self):
key = threading.current_thread().ident
assert key not in self._HOOKS
self._HOOKS[key] = self
self._hook = self._SetWindowsHookEx(
self.hook_id,
self._handler,
None,
0)
return self
def __exit__(self, exc_type, value, traceback):
key = threading.current_thread().ident
assert key in self._HOOKS
if self._hook is not None:
self._UnhookWindowsHookEx(self._hook)
del self._HOOKS[key]
@staticmethod
@_HOOKPROC
def _handler(code, msg, lpdata):
key = threading.current_thread().ident
self = SystemHook._HOOKS.get(key, None)
if self:
try:
self.on_hook(code, msg, lpdata)
except self.SuppressException:
return 1
except:
pass
return SystemHook._CallNextHookEx(0, code, msg, lpdata)
class ListenerMixin(object):
_EVENTS = None
_WM_PROCESS = 0x410
_WM_NOTIFICATIONS = []
def suppress_event(self):
raise SystemHook.SuppressException()
def _run(self):
self._message_loop = MessageLoop()
with self._receive():
self._mark_ready()
self._message_loop.start()
try:
with SystemHook(self._EVENTS, self._handler):
for msg in self._message_loop:
if not self.running:
break
if msg.message == self._WM_PROCESS:
self._process(msg.wParam, msg.lParam)
elif msg.message in self._WM_NOTIFICATIONS:
self._on_notification(
msg.message, msg.wParam, msg.lParam)
except:
pass
def _stop_platform(self):
try:
self._message_loop.stop()
except AttributeError:
pass
@AbstractListener._emitter
def _handler(self, code, msg, lpdata):
try:
converted = self._convert(code, msg, lpdata)
if converted is not None:
self._message_loop.post(self._WM_PROCESS, *converted)
except NotImplementedError:
self._handle(code, msg, lpdata)
if self.suppress:
self.suppress_event()
def _convert(self, code, msg, lpdata):
raise NotImplementedError()
def _process(self, wparam, lparam):
raise NotImplementedError()
def _handle(self, code, msg, lpdata):
raise NotImplementedError()
def _on_notification(self, code, wparam, lparam):
raise NotImplementedError()
class KeyTranslator(object):
_GetAsyncKeyState = ctypes.windll.user32.GetAsyncKeyState
_GetAsyncKeyState.argtypes = (
ctypes.c_int,)
_GetKeyboardLayout = ctypes.windll.user32.GetKeyboardLayout
_GetKeyboardLayout.argtypes = (
wintypes.DWORD,)
_GetKeyboardState = ctypes.windll.user32.GetKeyboardState
_GetKeyboardState.argtypes = (
ctypes.c_voidp,)
_GetKeyState = ctypes.windll.user32.GetAsyncKeyState
_GetKeyState.argtypes = (
ctypes.c_int,)
_MapVirtualKeyEx = ctypes.windll.user32.MapVirtualKeyExW
_MapVirtualKeyEx.argtypes = (
wintypes.UINT,
wintypes.UINT,
wintypes.HKL)
_ToUnicodeEx = ctypes.windll.user32.ToUnicodeEx
_ToUnicodeEx.argtypes = (
wintypes.UINT,
wintypes.UINT,
ctypes.c_voidp,
ctypes.c_voidp,
ctypes.c_int,
wintypes.UINT,
wintypes.HKL)
_MAPVK_VK_TO_VSC = 0
_MAPVK_VSC_TO_VK = 1
_MAPVK_VK_TO_CHAR = 2
def __init__(self):
self.update_layout()
def __call__(self, vk, is_press):
layout_data = self._layout_data[self._modifier_state()]
scan = self._to_scan(vk, self._layout)
character, is_dead = layout_data[scan]
return {
'char': character,
'is_dead': is_dead,
'vk': vk,
'_scan': scan}
def update_layout(self):
self._layout, self._layout_data = self._generate_layout()
def char_from_scan(self, scan):
return self._layout_data[(False, False, False)][scan][0]
def _generate_layout(self):
layout_data = {}
state = (ctypes.c_ubyte * 255)()
with self._thread_input() as active_thread:
layout = self._GetKeyboardLayout(active_thread)
vks = [
self._to_vk(scan, layout)
for scan in range(len(state))]
for shift, ctrl, alt in itertools.product(
(False, True), (False, True), (False, True)):
current = [(None, False)] * len(state)
layout_data[(shift, ctrl, alt)] = current
state[SHIFT] = 0x80 if shift else 0x00
state[CONTROL] = 0x80 if ctrl else 0x00
state[MENU] = 0x80 if alt else 0x00
out = (ctypes.wintypes.WCHAR * 5)()
for (scan, vk) in enumerate(vks):
count = self._ToUnicodeEx(
vk, scan, ctypes.byref(state), ctypes.byref(out),
len(out), 0, layout)
if count != 0:
character = out[0]
is_dead = count < 0
current[scan] = (character, is_dead)
if is_dead:
self._ToUnicodeEx(
vk, scan, ctypes.byref(state),
ctypes.byref(out), len(out), 0, layout)
return (layout, layout_data)
def _to_scan(self, vk, layout):
return self._MapVirtualKeyEx(
vk, self._MAPVK_VK_TO_VSC, layout)
def _to_vk(self, scan, layout):
return self._MapVirtualKeyEx(
scan, self._MAPVK_VSC_TO_VK, layout)
def _modifier_state(self):
shift = bool(self._GetAsyncKeyState(SHIFT) & 0x8000)
ctrl = bool(self._GetAsyncKeyState(CONTROL) & 0x8000)
alt = bool(self._GetAsyncKeyState(MENU) & 0x8000)
return (shift, ctrl, alt)
@contextlib.contextmanager
def _thread_input(self):
yield GetCurrentThreadId()
class KeyCode(BKeyCode):
_PLATFORM_EXTENSIONS = (
'_flags',
'_scan',
)
_flags = None
_scan = None
def _parameters(self, is_press):
if self.vk:
vk = self.vk
scan = self._scan \
or MapVirtualKey(vk, MapVirtualKey.MAPVK_VK_TO_VSC)
flags = 0
else:
res = VkKeyScan(self.char)
if (res >> 8) & 0xFF == 0:
vk = res & 0xFF
scan = self._scan \
or MapVirtualKey(vk, MapVirtualKey.MAPVK_VK_TO_VSC)
flags = 0
else:
vk = 0
scan = ord(self.char)
flags = KEYBDINPUT.UNICODE
state_flags = (KEYBDINPUT.KEYUP if not is_press else 0)
return dict(
dwFlags=(self._flags or 0) | flags | state_flags,
wVk=vk,
wScan=scan)
@classmethod
def _from_ext(cls, vk, **kwargs):
return cls.from_vk(vk, _flags=KEYBDINPUT.EXTENDEDKEY, **kwargs)
class Key(enum.Enum):
alt = KeyCode.from_vk(MENU)
alt_l = KeyCode.from_vk(LMENU)
alt_r = KeyCode._from_ext(RMENU)
alt_gr = KeyCode.from_vk(RMENU)
backspace = KeyCode.from_vk(BACK)
caps_lock = KeyCode.from_vk(CAPITAL)
cmd = KeyCode.from_vk(LWIN)
cmd_l = KeyCode.from_vk(LWIN)
cmd_r = KeyCode.from_vk(RWIN)
ctrl = KeyCode.from_vk(CONTROL)
ctrl_l = KeyCode.from_vk(LCONTROL)
ctrl_r = KeyCode._from_ext(RCONTROL)
delete = KeyCode._from_ext(DELETE)
down = KeyCode._from_ext(DOWN)
end = KeyCode._from_ext(END)
enter = KeyCode.from_vk(RETURN)
esc = KeyCode.from_vk(ESCAPE)
f1 = KeyCode.from_vk(F1)
f2 = KeyCode.from_vk(F2)
f3 = KeyCode.from_vk(F3)
f4 = KeyCode.from_vk(F4)
f5 = KeyCode.from_vk(F5)
f6 = KeyCode.from_vk(F6)
f7 = KeyCode.from_vk(F7)
f8 = KeyCode.from_vk(F8)
f9 = KeyCode.from_vk(F9)
f10 = KeyCode.from_vk(F10)
f11 = KeyCode.from_vk(F11)
f12 = KeyCode.from_vk(F12)
f13 = KeyCode.from_vk(F13)
f14 = KeyCode.from_vk(F14)
f15 = KeyCode.from_vk(F15)
f16 = KeyCode.from_vk(F16)
f17 = KeyCode.from_vk(F17)
f18 = KeyCode.from_vk(F18)
f19 = KeyCode.from_vk(F19)
f20 = KeyCode.from_vk(F20)
f21 = KeyCode.from_vk(F21)
f22 = KeyCode.from_vk(F22)
f23 = KeyCode.from_vk(F23)
f24 = KeyCode.from_vk(F24)
home = KeyCode._from_ext(HOME)
left = KeyCode._from_ext(LEFT)
page_down = KeyCode._from_ext(NEXT)
page_up = KeyCode._from_ext(PRIOR)
right = KeyCode._from_ext(RIGHT)
shift = KeyCode.from_vk(LSHIFT)
shift_l = KeyCode.from_vk(LSHIFT)
shift_r = KeyCode.from_vk(RSHIFT)
space = KeyCode.from_vk(SPACE, char=' ')
tab = KeyCode.from_vk(TAB)
up = KeyCode._from_ext(UP)
media_play_pause = KeyCode._from_ext(MEDIA_PLAY_PAUSE)
media_volume_mute = KeyCode._from_ext(VOLUME_MUTE)
media_volume_down = KeyCode._from_ext(VOLUME_DOWN)
media_volume_up = KeyCode._from_ext(VOLUME_UP)
media_previous = KeyCode._from_ext(MEDIA_PREV_TRACK)
media_next = KeyCode._from_ext(MEDIA_NEXT_TRACK)
insert = KeyCode._from_ext(INSERT)
menu = KeyCode.from_vk(APPS)
num_lock = KeyCode._from_ext(NUMLOCK)
pause = KeyCode.from_vk(PAUSE)
print_screen = KeyCode._from_ext(SNAPSHOT)
scroll_lock = KeyCode.from_vk(SCROLL)
class Controller(BController):
_KeyCode = KeyCode
_Key = Key
def __init__(self, *args, **kwargs):
super(Controller, self).__init__(*args, **kwargs)
def _handle(self, key, is_press):
SendInput(
1,
ctypes.byref(INPUT(
type=INPUT.KEYBOARD,
value=INPUT_union(
ki=KEYBDINPUT(**key._parameters(is_press))))),
ctypes.sizeof(INPUT))
class Listener(ListenerMixin, BListener):
_EVENTS = 13
_WM_INPUTLANGCHANGE = 0x0051
_WM_KEYDOWN = 0x0100
_WM_KEYUP = 0x0101
_WM_SYSKEYDOWN = 0x0104
_WM_SYSKEYUP = 0x0105
_UTF16_FLAG = 0x1000
_VK_PACKET = 0xE7
_PRESS_MESSAGES = (_WM_KEYDOWN, _WM_SYSKEYDOWN)
_RELEASE_MESSAGES = (_WM_KEYUP, _WM_SYSKEYUP)
_WM_NOTIFICATIONS = (
_WM_INPUTLANGCHANGE,
)
_SPECIAL_KEYS = {
key.value.vk: key
for key in Key}
_HANDLED_EXCEPTIONS = (
SystemHook.SuppressException,)
class _KBDLLHOOKSTRUCT(ctypes.Structure):
"""Contains information about a mouse event passed to a
``WH_KEYBOARD_LL`` hook procedure, ``LowLevelKeyboardProc``.
"""
_fields_ = [
('vkCode', wintypes.DWORD),
('scanCode', wintypes.DWORD),
('flags', wintypes.DWORD),
('time', wintypes.DWORD),
('dwExtraInfo', ctypes.c_void_p)]
_LPKBDLLHOOKSTRUCT = ctypes.POINTER(_KBDLLHOOKSTRUCT)
def __init__(self, *args, **kwargs):
super(Listener, self).__init__(*args, **kwargs)
self._translator = KeyTranslator()
self._event_filter = self._options.get(
'event_filter',
lambda msg, data: True)
def _convert(self, code, msg, lpdata):
if code != SystemHook.HC_ACTION:
return
data = ctypes.cast(lpdata, self._LPKBDLLHOOKSTRUCT).contents
is_packet = data.vkCode == self._VK_PACKET
if self._event_filter(msg, data) is False:
return None
elif is_packet:
return (msg | self._UTF16_FLAG, data.scanCode)
else:
return (msg, data.vkCode)
@AbstractListener._emitter
def _process(self, wparam, lparam):
msg = wparam
vk = lparam
is_utf16 = msg & self._UTF16_FLAG
if is_utf16:
msg = msg ^ self._UTF16_FLAG
scan = vk
key = KeyCode.from_char(six.unichr(scan))
else:
try:
key = self._event_to_key(msg, vk)
except OSError:
key = None
if msg in self._PRESS_MESSAGES:
self.on_press(key)
elif msg in self._RELEASE_MESSAGES:
self.on_release(key)
@contextlib.contextmanager
def _receive(self):
yield
def _on_notification(self, code, wparam, lparam):
if code == self._WM_INPUTLANGCHANGE:
self._translator.update_layout()
def _event_to_key(self, msg, vk):
if vk in self._SPECIAL_KEYS:
return self._SPECIAL_KEYS[vk]
else:
return KeyCode(**self._translate(
vk,
msg in self._PRESS_MESSAGES))
def _translate(self, vk, is_press):
return self._translator(vk, is_press)
def canonical(self, key):
scan = getattr(key, '_scan', None)
if scan is not None:
char = self._translator.char_from_scan(scan)
if char is not None:
return KeyCode.from_char(char)
return super(Listener, self).canonical(key)
Rect = collections.namedtuple("Rect", "left top right bottom")
Point = collections.namedtuple("Point", "x y")
Size = collections.namedtuple("Size", "width height")
class POINT(ctypes.Structure):
_fields_ = [("x", ctypes.c_long),
("y", ctypes.c_long)]
class RECT(ctypes.Structure):
"""A nice wrapper of the RECT structure.
Microsoft Documentation:
https://msdn.microsoft.com/en-us/library/windows/desktop/dd162897(v=vs.85).aspx
"""
_fields_ = [('left', ctypes.c_long),
('top', ctypes.c_long),
('right', ctypes.c_long),
('bottom', ctypes.c_long)]
class BaseWindow:
def __init__(self):
pass
def _setupRectProperties(self):
def _onRead(attrName):
r = self._getWindowRect()
self._rect._left = r.left # Setting _left directly to skip the onRead.
self._rect._top = r.top # Setting _top directly to skip the onRead.
self._rect._width = r.right - r.left # Setting _width directly to skip the onRead.
self._rect._height = r.bottom - r.top # Setting _height directly to skip the onRead.
def _onChange(oldBox, newBox):
self.moveTo(newBox.left, newBox.top)
self.resizeTo(newBox.width, newBox.height)
r = self._getWindowRect()
self._rect = pyrect.Rect(r.left, r.top, r.right - r.left, r.bottom - r.top, onChange=_onChange, onRead=_onRead)
def _getWindowRect(self):
raise NotImplementedError
def __str__(self):
r = self._getWindowRect()
width = r.right - r.left
height = r.bottom - r.top
return '<%s left="%s", top="%s", width="%s", height="%s", title="%s">' % (
self.__class__.__qualname__,
r.left,
r.top,
width,
height,
self.title,
)
@property
def left(self):
return self._rect.left
@left.setter
def left(self, value):
self._rect.left
self._rect.left = value
@property
def right(self):
return self._rect.right
@right.setter
def right(self, value):
self._rect.right
self._rect.right = value
@property
def top(self):
return self._rect.top
@top.setter
def top(self, value):
self._rect.top
self._rect.top = value
@property
def bottom(self):
return self._rect.bottom
@bottom.setter
def bottom(self, value):
self._rect.bottom
self._rect.bottom = value
@property
def topleft(self):
return self._rect.topleft
@topleft.setter
def topleft(self, value):
self._rect.topleft
self._rect.topleft = value
@property
def topright(self):
return self._rect.topright
@topright.setter
def topright(self, value):
self._rect.topright
self._rect.topright = value
@property
def bottomleft(self):
return self._rect.bottomleft
@bottomleft.setter
def bottomleft(self, value):
self._rect.bottomleft
self._rect.bottomleft = value
@property
def bottomright(self):
return self._rect.bottomright
@bottomright.setter
def bottomright(self, value):
self._rect.bottomright
self._rect.bottomright = value
@property
def midleft(self):
return self._rect.midleft
@midleft.setter
def midleft(self, value):
self._rect.midleft
self._rect.midleft = value
@property
def midright(self):
return self._rect.midright
@midright.setter
def midright(self, value):
self._rect.midright
self._rect.midright = value
@property
def midtop(self):
return self._rect.midtop
@midtop.setter
def midtop(self, value):
self._rect.midtop
self._rect.midtop = value
@property
def midbottom(self):
return self._rect.midbottom
@midbottom.setter
def midbottom(self, value):
self._rect.midbottom
self._rect.midbottom = value
@property
def center(self):
return self._rect.center
@center.setter
def center(self, value):
self._rect.center
self._rect.center = value
@property
def centerx(self):
return self._rect.centerx
@centerx.setter
def centerx(self, value):
self._rect.centerx
self._rect.centerx = value
@property
def centery(self):
return self._rect.centery
@centery.setter
def centery(self, value):
self._rect.centery
self._rect.centery = value
@property
def width(self):
return self._rect.width
@width.setter
def width(self, value):
self._rect.width
self._rect.width = value
@property
def height(self):
return self._rect.height
@height.setter
def height(self, value):
self._rect.height
self._rect.height = value
@property
def size(self):
return self._rect.size
@size.setter
def size(self, value):
self._rect.size
self._rect.size = value
@property
def area(self):
return self._rect.area
@area.setter
def area(self, value):
self._rect.area
self._rect.area = value
@property
def box(self):
return self._rect.box
@box.setter
def box(self, value):
self._rect.box
self._rect.box = value
class Win32Window(BaseWindow):
def __init__(self, hWnd):
self._hWnd = hWnd
self._setupRectProperties()
def _getWindowRect(self):
rect = RECT()
result = ctypes.windll.user32.GetWindowRect(self._hWnd, ctypes.byref(rect))
if result != 0:
return Rect(rect.left, rect.top, rect.right, rect.bottom)
else:
_raiseWithLastError()
def __repr__(self):
return '%s(hWnd=%s)' % (self.__class__.__name__, self._hWnd)
def __eq__(self, other):
return isinstance(other, Win32Window) and self._hWnd == other._hWnd
def close(self):
result = ctypes.windll.user32.PostMessageA(self._hWnd, WM_CLOSE, 0, 0)
if result == 0:
_raiseWithLastError()
def minimize(self):
ctypes.windll.user32.ShowWindow(self._hWnd, SW_MINIMIZE)
def maximize(self):
ctypes.windll.user32.ShowWindow(self._hWnd, SW_MAXIMIZE)
def restore(self):
ctypes.windll.user32.ShowWindow(self._hWnd, SW_RESTORE)
def show(self):
ctypes.windll.user32.ShowWindow(self._hWnd,SW_SHOW)
def hide(self):
ctypes.windll.user32.ShowWindow(self._hWnd,SW_HIDE)
def activate(self):
result = ctypes.windll.user32.SetForegroundWindow(self._hWnd)
if result == 0:
_raiseWithLastError()
def resize(self, widthOffset, heightOffset):
result = ctypes.windll.user32.SetWindowPos(self._hWnd, HWND_TOP, self.left, self.top, self.width + widthOffset, self.height + heightOffset, 0)
if result == 0:
_raiseWithLastError()
resizeRel = resize
def resizeTo(self, newWidth, newHeight):
result = ctypes.windll.user32.SetWindowPos(self._hWnd, HWND_TOP, self.left, self.top, newWidth, newHeight, 0)
if result == 0:
_raiseWithLastError()
def move(self, xOffset, yOffset):
result = ctypes.windll.user32.SetWindowPos(self._hWnd, HWND_TOP, self.left + xOffset, self.top + yOffset, self.width, self.height, 0)
if result == 0:
_raiseWithLastError()
moveRel = move
def moveTo(self, newLeft, newTop):
result = ctypes.windll.user32.SetWindowPos(self._hWnd, HWND_TOP, newLeft, newTop, self.width, self.height, 0)
if result == 0:
_raiseWithLastError()
@property
def isMinimized(self):
return ctypes.windll.user32.IsIconic(self._hWnd) != 0
@property
def isMaximized(self):
return ctypes.windll.user32.IsZoomed(self._hWnd) != 0
@property
def isActive(self):
return getActiveWindow() == self
@property
def title(self):
textLenInCharacters = ctypes.windll.user32.GetWindowTextLengthW(self._hWnd)
stringBuffer = ctypes.create_unicode_buffer(textLenInCharacters + 1)
ctypes.windll.user32.GetWindowTextW(self._hWnd, stringBuffer, textLenInCharacters + 1)
return stringBuffer.value
@property
def visible(self):
return isWindowVisible(self._hWnd)
def on_press(key):
try:
print('alphanumeric key {0} pressed'.format(
key.char))
except AttributeError:
print('special key {0} pressed'.format(
key))
def on_release(key):
print('{0} released'.format(
key))
if key == keyboard.Key.esc:
# Stop listener
return False
## LISTENER ABSTRACTION METHODS
# listen = Listener(
# on_press=on_press,
# on_release=on_release)
# listen.start()
## WINDOW ABSTRACTION METHODS
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment