Last active
December 21, 2022 22:49
-
-
Save gary23w/2c188f753a9f6a0a6a2a869232aa8348 to your computer and use it in GitHub Desktop.
keypresses and window focus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## an attempt to listen to keypresses and monitor window focus | |
import time | |
import random | |
import re | |
import sys | |
import enum | |
import six | |
from six.moves import queue | |
import unicodedata | |
import pyrect | |
import ctypes | |
import collections | |
import contextlib | |
import functools | |
import importlib | |
import itertools | |
import threading | |
from ctypes import ( | |
windll, | |
wintypes) | |
class AbstractListener(threading.Thread): | |
class StopException(Exception): | |
pass | |
_HANDLED_EXCEPTIONS = tuple() | |
def __init__(self, suppress=False, **kwargs): | |
super(AbstractListener, self).__init__() | |
def wrapper(f): | |
def inner(*args): | |
if f(*args) is False: | |
raise self.StopException() | |
return inner | |
self._suppress = suppress | |
self._running = False | |
self._thread = threading.current_thread() | |
self._condition = threading.Condition() | |
self._ready = False | |
self._queue = queue.Queue(10) | |
self.daemon = True | |
for name, callback in kwargs.items(): | |
setattr(self, name, wrapper(callback or (lambda *a: None))) | |
@property | |
def suppress(self): | |
return self._suppress | |
@property | |
def running(self): | |
return self._running | |
def stop(self): | |
if self._running: | |
self._running = False | |
self._queue.put(None) | |
self._stop_platform() | |
def __enter__(self): | |
self.start() | |
self.wait() | |
return self | |
def __exit__(self, exc_type, value, traceback): | |
self.stop() | |
def wait(self): | |
self._condition.acquire() | |
while not self._ready: | |
self._condition.wait() | |
self._condition.release() | |
def run(self): | |
self._running = True | |
self._thread = threading.current_thread() | |
self._run() | |
self._queue.put(None) | |
@classmethod | |
def _emitter(cls, f): | |
@functools.wraps(f) | |
def inner(self, *args, **kwargs): | |
try: | |
return f(self, *args, **kwargs) | |
except Exception as e: | |
if not isinstance(e, self._HANDLED_EXCEPTIONS): | |
if not isinstance(e, AbstractListener.StopException): | |
self._log.exception( | |
'Unhandled exception in listener callback') | |
self._queue.put( | |
None if isinstance(e, cls.StopException) | |
else sys.exc_info()) | |
self.stop() | |
raise | |
# pylint: enable=W0702 | |
return inner | |
def _mark_ready(self): | |
self._condition.acquire() | |
self._ready = True | |
self._condition.notify() | |
self._condition.release() | |
def _run(self): | |
raise NotImplementedError() | |
def _stop_platform(self): | |
raise NotImplementedError() | |
def join(self, *args): | |
super(AbstractListener, self).join(*args) | |
try: | |
exc_type, exc_value, exc_traceback = self._queue.get() | |
except TypeError: | |
return | |
six.reraise(exc_type, exc_value, exc_traceback) | |
def prefix(base, cls): | |
for super_cls in filter( | |
lambda cls: issubclass(cls, base), | |
cls.__mro__[1:]): | |
if super_cls is base: | |
return cls.__module__.rsplit('.', 1)[-1][1:] + '_' | |
else: | |
result = prefix(base, super_cls) | |
if result is not None: | |
return result | |
class BKeyCode(object): | |
_PLATFORM_EXTENSIONS = [] | |
def __init__(self, vk=None, char=None, is_dead=False, **kwargs): | |
self.vk = vk | |
self.char = six.text_type(char) if char is not None else None | |
self.is_dead = is_dead | |
if self.is_dead: | |
try: | |
self.combining = unicodedata.lookup( | |
'COMBINING ' + unicodedata.name(self.char)) | |
except KeyError: | |
self.is_dead = False | |
self.combining = None | |
if self.is_dead and not self.combining: | |
raise KeyError(char) | |
else: | |
self.combining = None | |
for key in self._PLATFORM_EXTENSIONS: | |
setattr(self, key, kwargs.pop(key, None)) | |
if kwargs: | |
raise ValueError(kwargs) | |
def __repr__(self): | |
if self.is_dead: | |
return '[%s]' % repr(self.char) | |
if self.char is not None: | |
return repr(self.char) | |
else: | |
return '<%d>' % self.vk | |
def __str__(self): | |
return repr(self) | |
def __eq__(self, other): | |
if not isinstance(other, self.__class__): | |
return False | |
if self.char is not None and other.char is not None: | |
return self.char == other.char and self.is_dead == other.is_dead | |
else: | |
return self.vk == other.vk and all( | |
getattr(self, f) == getattr(other, f) | |
for f in self._PLATFORM_EXTENSIONS) | |
def __hash__(self): | |
return hash(repr(self)) | |
def join(self, key): | |
if not self.is_dead: | |
raise ValueError(self) | |
if key.char == ' ' or self == key: | |
return self.from_char(self.char) | |
if key.char is not None: | |
combined = unicodedata.normalize( | |
'NFC', | |
key.char + self.combining) | |
if combined: | |
return self.from_char(combined) | |
raise ValueError(key) | |
@classmethod | |
def from_vk(cls, vk, **kwargs): | |
return cls(vk=vk, **kwargs) | |
@classmethod | |
def from_char(cls, char, **kwargs): | |
return cls(char=char, **kwargs) | |
@classmethod | |
def from_dead(cls, char, **kwargs): | |
return cls(char=char, is_dead=True, **kwargs) | |
class BKey(enum.Enum): | |
alt = 0 | |
alt_l = 0 | |
alt_r = 0 | |
alt_gr = 0 | |
backspace = 0 | |
caps_lock = 0 | |
cmd = 0 | |
cmd_l = 0 | |
cmd_r = 0 | |
ctrl = 0 | |
ctrl_l = 0 | |
ctrl_r = 0 | |
delete = 0 | |
down = 0 | |
end = 0 | |
enter = 0 | |
esc = 0 | |
f1 = 0 | |
f2 = 0 | |
f3 = 0 | |
f4 = 0 | |
f5 = 0 | |
f6 = 0 | |
f7 = 0 | |
f8 = 0 | |
f9 = 0 | |
f10 = 0 | |
f11 = 0 | |
f12 = 0 | |
f13 = 0 | |
f14 = 0 | |
f15 = 0 | |
f16 = 0 | |
f17 = 0 | |
f18 = 0 | |
f19 = 0 | |
f20 = 0 | |
home = 0 | |
left = 0 | |
page_down = 0 | |
page_up = 0 | |
right = 0 | |
shift = 0 | |
shift_l = 0 | |
shift_r = 0 | |
space = 0 | |
tab = 0 | |
up = 0 | |
media_play_pause = 0 | |
media_volume_mute = 0 | |
media_volume_down = 0 | |
media_volume_up = 0 | |
media_previous = 0 | |
media_next = 0 | |
insert = 0 | |
menu = 0 | |
num_lock = 0 | |
pause = 0 | |
print_screen = 0 | |
scroll_lock = 0 | |
class BController(object): | |
_KeyCode = BKeyCode | |
_Key = BKey | |
class InvalidKeyException(Exception): | |
"""An exception raised when an invalid key is used.""" | |
pass | |
class InvalidCharacterException(Exception): | |
"""An exception raised when an invalid character is used.""" | |
pass | |
def __init__(self): | |
self._modifiers_lock = threading.RLock() | |
self._modifiers = set() | |
self._caps_lock = False | |
self._dead_key = None | |
def press(self, key): | |
resolved = self._resolve(key) | |
if resolved is None: | |
raise self.InvalidKeyException(key) | |
self._update_modifiers(resolved, True) | |
if resolved == self._Key.caps_lock.value: | |
self._caps_lock = not self._caps_lock | |
original = resolved | |
if self._dead_key: | |
try: | |
resolved = self._dead_key.join(resolved) | |
except ValueError: | |
self._handle(self._dead_key, True) | |
self._handle(self._dead_key, False) | |
if resolved.is_dead: | |
self._dead_key = resolved | |
return | |
try: | |
self._handle(resolved, True) | |
except self.InvalidKeyException: | |
if resolved != original: | |
self._handle(self._dead_key, True) | |
self._handle(self._dead_key, False) | |
self._handle(original, True) | |
self._dead_key = None | |
def release(self, key): | |
resolved = self._resolve(key) | |
if resolved is None: | |
raise self.InvalidKeyException(key) | |
self._update_modifiers(resolved, False) | |
# Ignore released dead keys | |
if resolved.is_dead: | |
return | |
self._handle(resolved, False) | |
def tap(self, key): | |
self.press(key) | |
self.release(key) | |
def touch(self, key, is_press): | |
if is_press: | |
self.press(key) | |
else: | |
self.release(key) | |
@contextlib.contextmanager | |
def pressed(self, *args): | |
for key in args: | |
self.press(key) | |
try: | |
yield | |
finally: | |
for key in reversed(args): | |
self.release(key) | |
def type(self, string): | |
from . import _CONTROL_CODES | |
for i, character in enumerate(string): | |
key = _CONTROL_CODES.get(character, character) | |
try: | |
self.press(key) | |
self.release(key) | |
except (ValueError, self.InvalidKeyException): | |
raise self.InvalidCharacterException(i, character) | |
@property | |
@contextlib.contextmanager | |
def modifiers(self): | |
with self._modifiers_lock: | |
yield set( | |
self._as_modifier(modifier) | |
for modifier in self._modifiers) | |
@property | |
def alt_pressed(self): | |
with self.modifiers as modifiers: | |
return self._Key.alt in modifiers | |
@property | |
def alt_gr_pressed(self): | |
with self.modifiers as modifiers: | |
return self._Key.alt_gr in modifiers | |
@property | |
def ctrl_pressed(self): | |
with self.modifiers as modifiers: | |
return self._Key.ctrl in modifiers | |
@property | |
def shift_pressed(self): | |
if self._caps_lock: | |
return True | |
with self.modifiers as modifiers: | |
return self._Key.shift in modifiers | |
def _resolve(self, key): | |
if key in (k for k in self._Key): | |
return key.value | |
if isinstance(key, six.string_types): | |
if len(key) != 1: | |
raise ValueError(key) | |
return self._KeyCode.from_char(key) | |
if isinstance(key, self._KeyCode): | |
if key.char is not None and self.shift_pressed: | |
return self._KeyCode(vk=key.vk, char=key.char.upper()) | |
else: | |
return key | |
def _update_modifiers(self, key, is_press): | |
if self._as_modifier(key): | |
with self._modifiers_lock: | |
if is_press: | |
self._modifiers.add(key) | |
else: | |
try: | |
self._modifiers.remove(key) | |
except KeyError: | |
pass | |
def _as_modifier(self, key): | |
from . import _NORMAL_MODIFIERS | |
return _NORMAL_MODIFIERS.get(key, None) | |
def _handle(self, key, is_press): | |
raise NotImplementedError() | |
# pylint: disable=W0223; This is also an abstract class | |
class BListener(AbstractListener): | |
def __init__(self, on_press=None, on_release=None, suppress=False, | |
**kwargs): | |
option_prefix = prefix(Listener, self.__class__) | |
self._options = { | |
key[len(option_prefix):]: value | |
for key, value in kwargs.items() | |
if key.startswith(option_prefix)} | |
super(Listener, self).__init__( | |
on_press=on_press, on_release=on_release, suppress=suppress) | |
def canonical(self, key): | |
from pynput.keyboard import Key, KeyCode, _NORMAL_MODIFIERS | |
if isinstance(key, KeyCode) and key.char is not None: | |
return KeyCode.from_char(key.char.lower()) | |
elif isinstance(key, Key) and key.value in _NORMAL_MODIFIERS: | |
return _NORMAL_MODIFIERS[key.value] | |
elif isinstance(key, Key) and key.value.vk is not None: | |
return KeyCode.from_vk(key.value.vk) | |
else: | |
return key | |
##keys | |
LBUTTON = 1 | |
RBUTTON = 2 | |
CANCEL = 3 | |
MBUTTON = 4 | |
XBUTTON1 = 5 | |
XBUTTON2 = 6 | |
BACK = 8 | |
TAB = 9 | |
CLEAR = 12 | |
RETURN = 13 | |
SHIFT = 16 | |
CONTROL = 17 | |
MENU = 18 | |
PAUSE = 19 | |
CAPITAL = 20 | |
KANA = 21 | |
HANGEUL = 21 | |
HANGUL = 21 | |
JUNJA = 23 | |
FINAL = 24 | |
HANJA = 25 | |
KANJI = 25 | |
ESCAPE = 27 | |
CONVERT = 28 | |
NONCONVERT = 29 | |
ACCEPT = 30 | |
MODECHANGE = 31 | |
SPACE = 32 | |
PRIOR = 33 | |
NEXT = 34 | |
END = 35 | |
HOME = 36 | |
LEFT = 37 | |
UP = 38 | |
RIGHT = 39 | |
DOWN = 40 | |
SELECT = 41 | |
PRINT = 42 | |
EXECUTE = 43 | |
SNAPSHOT = 44 | |
INSERT = 45 | |
DELETE = 46 | |
HELP = 47 | |
LWIN = 91 | |
RWIN = 92 | |
APPS = 93 | |
SLEEP = 95 | |
NUMPAD0 = 96 | |
NUMPAD1 = 97 | |
NUMPAD2 = 98 | |
NUMPAD3 = 99 | |
NUMPAD4 = 100 | |
NUMPAD5 = 101 | |
NUMPAD6 = 102 | |
NUMPAD7 = 103 | |
NUMPAD8 = 104 | |
NUMPAD9 = 105 | |
MULTIPLY = 106 | |
ADD = 107 | |
SEPARATOR = 108 | |
SUBTRACT = 109 | |
DECIMAL = 110 | |
DIVIDE = 111 | |
F1 = 112 | |
F2 = 113 | |
F3 = 114 | |
F4 = 115 | |
F5 = 116 | |
F6 = 117 | |
F7 = 118 | |
F8 = 119 | |
F9 = 120 | |
F10 = 121 | |
F11 = 122 | |
F12 = 123 | |
F13 = 124 | |
F14 = 125 | |
F15 = 126 | |
F16 = 127 | |
F17 = 128 | |
F18 = 129 | |
F19 = 130 | |
F20 = 131 | |
F21 = 132 | |
F22 = 133 | |
F23 = 134 | |
F24 = 135 | |
NUMLOCK = 144 | |
SCROLL = 145 | |
OEM_NEC_EQUAL = 146 | |
OEM_FJ_JISHO = 146 | |
OEM_FJ_MASSHOU = 147 | |
OEM_FJ_TOUROKU = 148 | |
OEM_FJ_LOYA = 149 | |
OEM_FJ_ROYA = 150 | |
LSHIFT = 160 | |
RSHIFT = 161 | |
LCONTROL = 162 | |
RCONTROL = 163 | |
LMENU = 164 | |
RMENU = 165 | |
BROWSER_BACK = 166 | |
BROWSER_FORWARD = 167 | |
BROWSER_REFRESH = 168 | |
BROWSER_STOP = 169 | |
BROWSER_SEARCH = 170 | |
BROWSER_FAVORITES = 171 | |
BROWSER_HOME = 172 | |
VOLUME_MUTE = 173 | |
VOLUME_DOWN = 174 | |
VOLUME_UP = 175 | |
MEDIA_NEXT_TRACK = 176 | |
MEDIA_PREV_TRACK = 177 | |
MEDIA_STOP = 178 | |
MEDIA_PLAY_PAUSE = 179 | |
LAUNCH_MAIL = 180 | |
LAUNCH_MEDIA_SELECT = 181 | |
LAUNCH_APP1 = 182 | |
LAUNCH_APP2 = 183 | |
OEM_1 = 186 | |
OEM_PLUS = 187 | |
OEM_COMMA = 188 | |
OEM_MINUS = 189 | |
OEM_PERIOD = 190 | |
OEM_2 = 191 | |
OEM_3 = 192 | |
OEM_4 = 219 | |
OEM_5 = 220 | |
OEM_6 = 221 | |
OEM_7 = 222 | |
OEM_8 = 223 | |
OEM_AX = 225 | |
OEM_102 = 226 | |
ICO_HELP = 227 | |
ICO_00 = 228 | |
PROCESSKEY = 229 | |
ICO_CLEAR = 230 | |
PACKET = 231 | |
OEM_RESET = 233 | |
OEM_JUMP = 234 | |
OEM_PA1 = 235 | |
OEM_PA2 = 236 | |
OEM_PA3 = 237 | |
OEM_WSCTRL = 238 | |
OEM_CUSEL = 239 | |
OEM_ATTN = 240 | |
OEM_FINISH = 241 | |
OEM_COPY = 242 | |
OEM_AUTO = 243 | |
OEM_ENLW = 244 | |
OEM_BACKTAB = 245 | |
ATTN = 246 | |
CRSEL = 247 | |
EXSEL = 248 | |
EREOF = 249 | |
PLAY = 250 | |
ZOOM = 251 | |
NONAME = 252 | |
PA1 = 253 | |
OEM_CLEAR = 254 | |
class MOUSEINPUT(ctypes.Structure): | |
MOVE = 0x0001 | |
LEFTDOWN = 0x0002 | |
LEFTUP = 0x0004 | |
RIGHTDOWN = 0x0008 | |
RIGHTUP = 0x0010 | |
MIDDLEDOWN = 0x0020 | |
MIDDLEUP = 0x0040 | |
XDOWN = 0x0080 | |
XUP = 0x0100 | |
WHEEL = 0x0800 | |
HWHEEL = 0x1000 | |
ABSOLUTE = 0x8000 | |
XBUTTON1 = 0x0001 | |
XBUTTON2 = 0x0002 | |
_fields_ = [ | |
('dx', wintypes.LONG), | |
('dy', wintypes.LONG), | |
('mouseData', wintypes.DWORD), | |
('dwFlags', wintypes.DWORD), | |
('time', wintypes.DWORD), | |
('dwExtraInfo', ctypes.c_void_p)] | |
class KEYBDINPUT(ctypes.Structure): | |
EXTENDEDKEY = 0x0001 | |
KEYUP = 0x0002 | |
SCANCODE = 0x0008 | |
UNICODE = 0x0004 | |
_fields_ = [ | |
('wVk', wintypes.WORD), | |
('wScan', wintypes.WORD), | |
('dwFlags', wintypes.DWORD), | |
('time', wintypes.DWORD), | |
('dwExtraInfo', ctypes.c_void_p)] | |
class HARDWAREINPUT(ctypes.Structure): | |
_fields_ = [ | |
('uMsg', wintypes.DWORD), | |
('wParamL', wintypes.WORD), | |
('wParamH', wintypes.WORD)] | |
class INPUT_union(ctypes.Union): | |
_fields_ = [ | |
('mi', MOUSEINPUT), | |
('ki', KEYBDINPUT), | |
('hi', HARDWAREINPUT)] | |
class INPUT(ctypes.Structure): | |
MOUSE = 0 | |
KEYBOARD = 1 | |
HARDWARE = 2 | |
_fields_ = [ | |
('type', wintypes.DWORD), | |
('value', INPUT_union)] | |
LPINPUT = ctypes.POINTER(INPUT) | |
VkKeyScan = windll.user32.VkKeyScanW | |
VkKeyScan.argtypes = ( | |
wintypes.WCHAR,) | |
MapVirtualKey = windll.user32.MapVirtualKeyW | |
MapVirtualKey.argtypes = ( | |
wintypes.UINT, | |
wintypes.UINT) | |
MapVirtualKey.MAPVK_VK_TO_VSC = 0 | |
SendInput = windll.user32.SendInput | |
SendInput.argtypes = ( | |
wintypes.UINT, | |
ctypes.c_voidp, # Really LPINPUT | |
ctypes.c_int) | |
GetCurrentThreadId = windll.kernel32.GetCurrentThreadId | |
GetCurrentThreadId.restype = wintypes.DWORD | |
class MessageLoop(object): | |
WM_STOP = 0x0401 | |
_LPMSG = ctypes.POINTER(wintypes.MSG) | |
_GetMessage = windll.user32.GetMessageW | |
_GetMessage.argtypes = ( | |
ctypes.c_voidp, | |
wintypes.HWND, | |
wintypes.UINT, | |
wintypes.UINT) | |
_PeekMessage = windll.user32.PeekMessageW | |
_PeekMessage.argtypes = ( | |
ctypes.c_voidp, | |
wintypes.HWND, | |
wintypes.UINT, | |
wintypes.UINT, | |
wintypes.UINT) | |
_PostThreadMessage = windll.user32.PostThreadMessageW | |
_PostThreadMessage.argtypes = ( | |
wintypes.DWORD, | |
wintypes.UINT, | |
wintypes.WPARAM, | |
wintypes.LPARAM) | |
PM_NOREMOVE = 0 | |
def __init__(self): | |
self._threadid = None | |
self._event = threading.Event() | |
self.thread = None | |
def __iter__(self): | |
assert self._threadid is not None | |
try: | |
while True: | |
msg = wintypes.MSG() | |
lpmsg = ctypes.byref(msg) | |
r = self._GetMessage(lpmsg, None, 0, 0) | |
if r <= 0 or msg.message == self.WM_STOP: | |
break | |
else: | |
yield msg | |
finally: | |
self._threadid = None | |
self.thread = None | |
def start(self): | |
self._threadid = GetCurrentThreadId() | |
self.thread = threading.current_thread() | |
msg = wintypes.MSG() | |
lpmsg = ctypes.byref(msg) | |
self._PeekMessage(lpmsg, None, 0x0400, 0x0400, self.PM_NOREMOVE) | |
self._event.set() | |
def stop(self): | |
self._event.wait() | |
if self._threadid: | |
self.post(self.WM_STOP, 0, 0) | |
def post(self, msg, wparam, lparam): | |
self._PostThreadMessage(self._threadid, msg, wparam, lparam) | |
class SystemHook(object): | |
HC_ACTION = 0 | |
_HOOKPROC = ctypes.WINFUNCTYPE( | |
wintypes.LPARAM, | |
ctypes.c_int32, wintypes.WPARAM, wintypes.LPARAM) | |
_SetWindowsHookEx = windll.user32.SetWindowsHookExW | |
_SetWindowsHookEx.argtypes = ( | |
ctypes.c_int, | |
_HOOKPROC, | |
wintypes.HINSTANCE, | |
wintypes.DWORD) | |
_UnhookWindowsHookEx = windll.user32.UnhookWindowsHookEx | |
_UnhookWindowsHookEx.argtypes = ( | |
wintypes.HHOOK,) | |
_CallNextHookEx = windll.user32.CallNextHookEx | |
_CallNextHookEx.argtypes = ( | |
wintypes.HHOOK, | |
ctypes.c_int, | |
wintypes.WPARAM, | |
wintypes.LPARAM) | |
#: The registered hook procedures | |
_HOOKS = {} | |
class SuppressException(Exception): | |
pass | |
def __init__(self, hook_id, on_hook=lambda code, msg, lpdata: None): | |
self.hook_id = hook_id | |
self.on_hook = on_hook | |
self._hook = None | |
def __enter__(self): | |
key = threading.current_thread().ident | |
assert key not in self._HOOKS | |
self._HOOKS[key] = self | |
self._hook = self._SetWindowsHookEx( | |
self.hook_id, | |
self._handler, | |
None, | |
0) | |
return self | |
def __exit__(self, exc_type, value, traceback): | |
key = threading.current_thread().ident | |
assert key in self._HOOKS | |
if self._hook is not None: | |
self._UnhookWindowsHookEx(self._hook) | |
del self._HOOKS[key] | |
@staticmethod | |
@_HOOKPROC | |
def _handler(code, msg, lpdata): | |
key = threading.current_thread().ident | |
self = SystemHook._HOOKS.get(key, None) | |
if self: | |
try: | |
self.on_hook(code, msg, lpdata) | |
except self.SuppressException: | |
return 1 | |
except: | |
pass | |
return SystemHook._CallNextHookEx(0, code, msg, lpdata) | |
class ListenerMixin(object): | |
_EVENTS = None | |
_WM_PROCESS = 0x410 | |
_WM_NOTIFICATIONS = [] | |
def suppress_event(self): | |
raise SystemHook.SuppressException() | |
def _run(self): | |
self._message_loop = MessageLoop() | |
with self._receive(): | |
self._mark_ready() | |
self._message_loop.start() | |
try: | |
with SystemHook(self._EVENTS, self._handler): | |
for msg in self._message_loop: | |
if not self.running: | |
break | |
if msg.message == self._WM_PROCESS: | |
self._process(msg.wParam, msg.lParam) | |
elif msg.message in self._WM_NOTIFICATIONS: | |
self._on_notification( | |
msg.message, msg.wParam, msg.lParam) | |
except: | |
pass | |
def _stop_platform(self): | |
try: | |
self._message_loop.stop() | |
except AttributeError: | |
pass | |
@AbstractListener._emitter | |
def _handler(self, code, msg, lpdata): | |
try: | |
converted = self._convert(code, msg, lpdata) | |
if converted is not None: | |
self._message_loop.post(self._WM_PROCESS, *converted) | |
except NotImplementedError: | |
self._handle(code, msg, lpdata) | |
if self.suppress: | |
self.suppress_event() | |
def _convert(self, code, msg, lpdata): | |
raise NotImplementedError() | |
def _process(self, wparam, lparam): | |
raise NotImplementedError() | |
def _handle(self, code, msg, lpdata): | |
raise NotImplementedError() | |
def _on_notification(self, code, wparam, lparam): | |
raise NotImplementedError() | |
class KeyTranslator(object): | |
_GetAsyncKeyState = ctypes.windll.user32.GetAsyncKeyState | |
_GetAsyncKeyState.argtypes = ( | |
ctypes.c_int,) | |
_GetKeyboardLayout = ctypes.windll.user32.GetKeyboardLayout | |
_GetKeyboardLayout.argtypes = ( | |
wintypes.DWORD,) | |
_GetKeyboardState = ctypes.windll.user32.GetKeyboardState | |
_GetKeyboardState.argtypes = ( | |
ctypes.c_voidp,) | |
_GetKeyState = ctypes.windll.user32.GetAsyncKeyState | |
_GetKeyState.argtypes = ( | |
ctypes.c_int,) | |
_MapVirtualKeyEx = ctypes.windll.user32.MapVirtualKeyExW | |
_MapVirtualKeyEx.argtypes = ( | |
wintypes.UINT, | |
wintypes.UINT, | |
wintypes.HKL) | |
_ToUnicodeEx = ctypes.windll.user32.ToUnicodeEx | |
_ToUnicodeEx.argtypes = ( | |
wintypes.UINT, | |
wintypes.UINT, | |
ctypes.c_voidp, | |
ctypes.c_voidp, | |
ctypes.c_int, | |
wintypes.UINT, | |
wintypes.HKL) | |
_MAPVK_VK_TO_VSC = 0 | |
_MAPVK_VSC_TO_VK = 1 | |
_MAPVK_VK_TO_CHAR = 2 | |
def __init__(self): | |
self.update_layout() | |
def __call__(self, vk, is_press): | |
layout_data = self._layout_data[self._modifier_state()] | |
scan = self._to_scan(vk, self._layout) | |
character, is_dead = layout_data[scan] | |
return { | |
'char': character, | |
'is_dead': is_dead, | |
'vk': vk, | |
'_scan': scan} | |
def update_layout(self): | |
self._layout, self._layout_data = self._generate_layout() | |
def char_from_scan(self, scan): | |
return self._layout_data[(False, False, False)][scan][0] | |
def _generate_layout(self): | |
layout_data = {} | |
state = (ctypes.c_ubyte * 255)() | |
with self._thread_input() as active_thread: | |
layout = self._GetKeyboardLayout(active_thread) | |
vks = [ | |
self._to_vk(scan, layout) | |
for scan in range(len(state))] | |
for shift, ctrl, alt in itertools.product( | |
(False, True), (False, True), (False, True)): | |
current = [(None, False)] * len(state) | |
layout_data[(shift, ctrl, alt)] = current | |
state[SHIFT] = 0x80 if shift else 0x00 | |
state[CONTROL] = 0x80 if ctrl else 0x00 | |
state[MENU] = 0x80 if alt else 0x00 | |
out = (ctypes.wintypes.WCHAR * 5)() | |
for (scan, vk) in enumerate(vks): | |
count = self._ToUnicodeEx( | |
vk, scan, ctypes.byref(state), ctypes.byref(out), | |
len(out), 0, layout) | |
if count != 0: | |
character = out[0] | |
is_dead = count < 0 | |
current[scan] = (character, is_dead) | |
if is_dead: | |
self._ToUnicodeEx( | |
vk, scan, ctypes.byref(state), | |
ctypes.byref(out), len(out), 0, layout) | |
return (layout, layout_data) | |
def _to_scan(self, vk, layout): | |
return self._MapVirtualKeyEx( | |
vk, self._MAPVK_VK_TO_VSC, layout) | |
def _to_vk(self, scan, layout): | |
return self._MapVirtualKeyEx( | |
scan, self._MAPVK_VSC_TO_VK, layout) | |
def _modifier_state(self): | |
shift = bool(self._GetAsyncKeyState(SHIFT) & 0x8000) | |
ctrl = bool(self._GetAsyncKeyState(CONTROL) & 0x8000) | |
alt = bool(self._GetAsyncKeyState(MENU) & 0x8000) | |
return (shift, ctrl, alt) | |
@contextlib.contextmanager | |
def _thread_input(self): | |
yield GetCurrentThreadId() | |
class KeyCode(BKeyCode): | |
_PLATFORM_EXTENSIONS = ( | |
'_flags', | |
'_scan', | |
) | |
_flags = None | |
_scan = None | |
def _parameters(self, is_press): | |
if self.vk: | |
vk = self.vk | |
scan = self._scan \ | |
or MapVirtualKey(vk, MapVirtualKey.MAPVK_VK_TO_VSC) | |
flags = 0 | |
else: | |
res = VkKeyScan(self.char) | |
if (res >> 8) & 0xFF == 0: | |
vk = res & 0xFF | |
scan = self._scan \ | |
or MapVirtualKey(vk, MapVirtualKey.MAPVK_VK_TO_VSC) | |
flags = 0 | |
else: | |
vk = 0 | |
scan = ord(self.char) | |
flags = KEYBDINPUT.UNICODE | |
state_flags = (KEYBDINPUT.KEYUP if not is_press else 0) | |
return dict( | |
dwFlags=(self._flags or 0) | flags | state_flags, | |
wVk=vk, | |
wScan=scan) | |
@classmethod | |
def _from_ext(cls, vk, **kwargs): | |
return cls.from_vk(vk, _flags=KEYBDINPUT.EXTENDEDKEY, **kwargs) | |
class Key(enum.Enum): | |
alt = KeyCode.from_vk(MENU) | |
alt_l = KeyCode.from_vk(LMENU) | |
alt_r = KeyCode._from_ext(RMENU) | |
alt_gr = KeyCode.from_vk(RMENU) | |
backspace = KeyCode.from_vk(BACK) | |
caps_lock = KeyCode.from_vk(CAPITAL) | |
cmd = KeyCode.from_vk(LWIN) | |
cmd_l = KeyCode.from_vk(LWIN) | |
cmd_r = KeyCode.from_vk(RWIN) | |
ctrl = KeyCode.from_vk(CONTROL) | |
ctrl_l = KeyCode.from_vk(LCONTROL) | |
ctrl_r = KeyCode._from_ext(RCONTROL) | |
delete = KeyCode._from_ext(DELETE) | |
down = KeyCode._from_ext(DOWN) | |
end = KeyCode._from_ext(END) | |
enter = KeyCode.from_vk(RETURN) | |
esc = KeyCode.from_vk(ESCAPE) | |
f1 = KeyCode.from_vk(F1) | |
f2 = KeyCode.from_vk(F2) | |
f3 = KeyCode.from_vk(F3) | |
f4 = KeyCode.from_vk(F4) | |
f5 = KeyCode.from_vk(F5) | |
f6 = KeyCode.from_vk(F6) | |
f7 = KeyCode.from_vk(F7) | |
f8 = KeyCode.from_vk(F8) | |
f9 = KeyCode.from_vk(F9) | |
f10 = KeyCode.from_vk(F10) | |
f11 = KeyCode.from_vk(F11) | |
f12 = KeyCode.from_vk(F12) | |
f13 = KeyCode.from_vk(F13) | |
f14 = KeyCode.from_vk(F14) | |
f15 = KeyCode.from_vk(F15) | |
f16 = KeyCode.from_vk(F16) | |
f17 = KeyCode.from_vk(F17) | |
f18 = KeyCode.from_vk(F18) | |
f19 = KeyCode.from_vk(F19) | |
f20 = KeyCode.from_vk(F20) | |
f21 = KeyCode.from_vk(F21) | |
f22 = KeyCode.from_vk(F22) | |
f23 = KeyCode.from_vk(F23) | |
f24 = KeyCode.from_vk(F24) | |
home = KeyCode._from_ext(HOME) | |
left = KeyCode._from_ext(LEFT) | |
page_down = KeyCode._from_ext(NEXT) | |
page_up = KeyCode._from_ext(PRIOR) | |
right = KeyCode._from_ext(RIGHT) | |
shift = KeyCode.from_vk(LSHIFT) | |
shift_l = KeyCode.from_vk(LSHIFT) | |
shift_r = KeyCode.from_vk(RSHIFT) | |
space = KeyCode.from_vk(SPACE, char=' ') | |
tab = KeyCode.from_vk(TAB) | |
up = KeyCode._from_ext(UP) | |
media_play_pause = KeyCode._from_ext(MEDIA_PLAY_PAUSE) | |
media_volume_mute = KeyCode._from_ext(VOLUME_MUTE) | |
media_volume_down = KeyCode._from_ext(VOLUME_DOWN) | |
media_volume_up = KeyCode._from_ext(VOLUME_UP) | |
media_previous = KeyCode._from_ext(MEDIA_PREV_TRACK) | |
media_next = KeyCode._from_ext(MEDIA_NEXT_TRACK) | |
insert = KeyCode._from_ext(INSERT) | |
menu = KeyCode.from_vk(APPS) | |
num_lock = KeyCode._from_ext(NUMLOCK) | |
pause = KeyCode.from_vk(PAUSE) | |
print_screen = KeyCode._from_ext(SNAPSHOT) | |
scroll_lock = KeyCode.from_vk(SCROLL) | |
class Controller(BController): | |
_KeyCode = KeyCode | |
_Key = Key | |
def __init__(self, *args, **kwargs): | |
super(Controller, self).__init__(*args, **kwargs) | |
def _handle(self, key, is_press): | |
SendInput( | |
1, | |
ctypes.byref(INPUT( | |
type=INPUT.KEYBOARD, | |
value=INPUT_union( | |
ki=KEYBDINPUT(**key._parameters(is_press))))), | |
ctypes.sizeof(INPUT)) | |
class Listener(ListenerMixin, BListener): | |
_EVENTS = 13 | |
_WM_INPUTLANGCHANGE = 0x0051 | |
_WM_KEYDOWN = 0x0100 | |
_WM_KEYUP = 0x0101 | |
_WM_SYSKEYDOWN = 0x0104 | |
_WM_SYSKEYUP = 0x0105 | |
_UTF16_FLAG = 0x1000 | |
_VK_PACKET = 0xE7 | |
_PRESS_MESSAGES = (_WM_KEYDOWN, _WM_SYSKEYDOWN) | |
_RELEASE_MESSAGES = (_WM_KEYUP, _WM_SYSKEYUP) | |
_WM_NOTIFICATIONS = ( | |
_WM_INPUTLANGCHANGE, | |
) | |
_SPECIAL_KEYS = { | |
key.value.vk: key | |
for key in Key} | |
_HANDLED_EXCEPTIONS = ( | |
SystemHook.SuppressException,) | |
class _KBDLLHOOKSTRUCT(ctypes.Structure): | |
"""Contains information about a mouse event passed to a | |
``WH_KEYBOARD_LL`` hook procedure, ``LowLevelKeyboardProc``. | |
""" | |
_fields_ = [ | |
('vkCode', wintypes.DWORD), | |
('scanCode', wintypes.DWORD), | |
('flags', wintypes.DWORD), | |
('time', wintypes.DWORD), | |
('dwExtraInfo', ctypes.c_void_p)] | |
_LPKBDLLHOOKSTRUCT = ctypes.POINTER(_KBDLLHOOKSTRUCT) | |
def __init__(self, *args, **kwargs): | |
super(Listener, self).__init__(*args, **kwargs) | |
self._translator = KeyTranslator() | |
self._event_filter = self._options.get( | |
'event_filter', | |
lambda msg, data: True) | |
def _convert(self, code, msg, lpdata): | |
if code != SystemHook.HC_ACTION: | |
return | |
data = ctypes.cast(lpdata, self._LPKBDLLHOOKSTRUCT).contents | |
is_packet = data.vkCode == self._VK_PACKET | |
if self._event_filter(msg, data) is False: | |
return None | |
elif is_packet: | |
return (msg | self._UTF16_FLAG, data.scanCode) | |
else: | |
return (msg, data.vkCode) | |
@AbstractListener._emitter | |
def _process(self, wparam, lparam): | |
msg = wparam | |
vk = lparam | |
is_utf16 = msg & self._UTF16_FLAG | |
if is_utf16: | |
msg = msg ^ self._UTF16_FLAG | |
scan = vk | |
key = KeyCode.from_char(six.unichr(scan)) | |
else: | |
try: | |
key = self._event_to_key(msg, vk) | |
except OSError: | |
key = None | |
if msg in self._PRESS_MESSAGES: | |
self.on_press(key) | |
elif msg in self._RELEASE_MESSAGES: | |
self.on_release(key) | |
@contextlib.contextmanager | |
def _receive(self): | |
yield | |
def _on_notification(self, code, wparam, lparam): | |
if code == self._WM_INPUTLANGCHANGE: | |
self._translator.update_layout() | |
def _event_to_key(self, msg, vk): | |
if vk in self._SPECIAL_KEYS: | |
return self._SPECIAL_KEYS[vk] | |
else: | |
return KeyCode(**self._translate( | |
vk, | |
msg in self._PRESS_MESSAGES)) | |
def _translate(self, vk, is_press): | |
return self._translator(vk, is_press) | |
def canonical(self, key): | |
scan = getattr(key, '_scan', None) | |
if scan is not None: | |
char = self._translator.char_from_scan(scan) | |
if char is not None: | |
return KeyCode.from_char(char) | |
return super(Listener, self).canonical(key) | |
Rect = collections.namedtuple("Rect", "left top right bottom") | |
Point = collections.namedtuple("Point", "x y") | |
Size = collections.namedtuple("Size", "width height") | |
class POINT(ctypes.Structure): | |
_fields_ = [("x", ctypes.c_long), | |
("y", ctypes.c_long)] | |
class RECT(ctypes.Structure): | |
"""A nice wrapper of the RECT structure. | |
Microsoft Documentation: | |
https://msdn.microsoft.com/en-us/library/windows/desktop/dd162897(v=vs.85).aspx | |
""" | |
_fields_ = [('left', ctypes.c_long), | |
('top', ctypes.c_long), | |
('right', ctypes.c_long), | |
('bottom', ctypes.c_long)] | |
class BaseWindow: | |
def __init__(self): | |
pass | |
def _setupRectProperties(self): | |
def _onRead(attrName): | |
r = self._getWindowRect() | |
self._rect._left = r.left # Setting _left directly to skip the onRead. | |
self._rect._top = r.top # Setting _top directly to skip the onRead. | |
self._rect._width = r.right - r.left # Setting _width directly to skip the onRead. | |
self._rect._height = r.bottom - r.top # Setting _height directly to skip the onRead. | |
def _onChange(oldBox, newBox): | |
self.moveTo(newBox.left, newBox.top) | |
self.resizeTo(newBox.width, newBox.height) | |
r = self._getWindowRect() | |
self._rect = pyrect.Rect(r.left, r.top, r.right - r.left, r.bottom - r.top, onChange=_onChange, onRead=_onRead) | |
def _getWindowRect(self): | |
raise NotImplementedError | |
def __str__(self): | |
r = self._getWindowRect() | |
width = r.right - r.left | |
height = r.bottom - r.top | |
return '<%s left="%s", top="%s", width="%s", height="%s", title="%s">' % ( | |
self.__class__.__qualname__, | |
r.left, | |
r.top, | |
width, | |
height, | |
self.title, | |
) | |
@property | |
def left(self): | |
return self._rect.left | |
@left.setter | |
def left(self, value): | |
self._rect.left | |
self._rect.left = value | |
@property | |
def right(self): | |
return self._rect.right | |
@right.setter | |
def right(self, value): | |
self._rect.right | |
self._rect.right = value | |
@property | |
def top(self): | |
return self._rect.top | |
@top.setter | |
def top(self, value): | |
self._rect.top | |
self._rect.top = value | |
@property | |
def bottom(self): | |
return self._rect.bottom | |
@bottom.setter | |
def bottom(self, value): | |
self._rect.bottom | |
self._rect.bottom = value | |
@property | |
def topleft(self): | |
return self._rect.topleft | |
@topleft.setter | |
def topleft(self, value): | |
self._rect.topleft | |
self._rect.topleft = value | |
@property | |
def topright(self): | |
return self._rect.topright | |
@topright.setter | |
def topright(self, value): | |
self._rect.topright | |
self._rect.topright = value | |
@property | |
def bottomleft(self): | |
return self._rect.bottomleft | |
@bottomleft.setter | |
def bottomleft(self, value): | |
self._rect.bottomleft | |
self._rect.bottomleft = value | |
@property | |
def bottomright(self): | |
return self._rect.bottomright | |
@bottomright.setter | |
def bottomright(self, value): | |
self._rect.bottomright | |
self._rect.bottomright = value | |
@property | |
def midleft(self): | |
return self._rect.midleft | |
@midleft.setter | |
def midleft(self, value): | |
self._rect.midleft | |
self._rect.midleft = value | |
@property | |
def midright(self): | |
return self._rect.midright | |
@midright.setter | |
def midright(self, value): | |
self._rect.midright | |
self._rect.midright = value | |
@property | |
def midtop(self): | |
return self._rect.midtop | |
@midtop.setter | |
def midtop(self, value): | |
self._rect.midtop | |
self._rect.midtop = value | |
@property | |
def midbottom(self): | |
return self._rect.midbottom | |
@midbottom.setter | |
def midbottom(self, value): | |
self._rect.midbottom | |
self._rect.midbottom = value | |
@property | |
def center(self): | |
return self._rect.center | |
@center.setter | |
def center(self, value): | |
self._rect.center | |
self._rect.center = value | |
@property | |
def centerx(self): | |
return self._rect.centerx | |
@centerx.setter | |
def centerx(self, value): | |
self._rect.centerx | |
self._rect.centerx = value | |
@property | |
def centery(self): | |
return self._rect.centery | |
@centery.setter | |
def centery(self, value): | |
self._rect.centery | |
self._rect.centery = value | |
@property | |
def width(self): | |
return self._rect.width | |
@width.setter | |
def width(self, value): | |
self._rect.width | |
self._rect.width = value | |
@property | |
def height(self): | |
return self._rect.height | |
@height.setter | |
def height(self, value): | |
self._rect.height | |
self._rect.height = value | |
@property | |
def size(self): | |
return self._rect.size | |
@size.setter | |
def size(self, value): | |
self._rect.size | |
self._rect.size = value | |
@property | |
def area(self): | |
return self._rect.area | |
@area.setter | |
def area(self, value): | |
self._rect.area | |
self._rect.area = value | |
@property | |
def box(self): | |
return self._rect.box | |
@box.setter | |
def box(self, value): | |
self._rect.box | |
self._rect.box = value | |
class Win32Window(BaseWindow): | |
def __init__(self, hWnd): | |
self._hWnd = hWnd | |
self._setupRectProperties() | |
def _getWindowRect(self): | |
rect = RECT() | |
result = ctypes.windll.user32.GetWindowRect(self._hWnd, ctypes.byref(rect)) | |
if result != 0: | |
return Rect(rect.left, rect.top, rect.right, rect.bottom) | |
else: | |
_raiseWithLastError() | |
def __repr__(self): | |
return '%s(hWnd=%s)' % (self.__class__.__name__, self._hWnd) | |
def __eq__(self, other): | |
return isinstance(other, Win32Window) and self._hWnd == other._hWnd | |
def close(self): | |
result = ctypes.windll.user32.PostMessageA(self._hWnd, WM_CLOSE, 0, 0) | |
if result == 0: | |
_raiseWithLastError() | |
def minimize(self): | |
ctypes.windll.user32.ShowWindow(self._hWnd, SW_MINIMIZE) | |
def maximize(self): | |
ctypes.windll.user32.ShowWindow(self._hWnd, SW_MAXIMIZE) | |
def restore(self): | |
ctypes.windll.user32.ShowWindow(self._hWnd, SW_RESTORE) | |
def show(self): | |
ctypes.windll.user32.ShowWindow(self._hWnd,SW_SHOW) | |
def hide(self): | |
ctypes.windll.user32.ShowWindow(self._hWnd,SW_HIDE) | |
def activate(self): | |
result = ctypes.windll.user32.SetForegroundWindow(self._hWnd) | |
if result == 0: | |
_raiseWithLastError() | |
def resize(self, widthOffset, heightOffset): | |
result = ctypes.windll.user32.SetWindowPos(self._hWnd, HWND_TOP, self.left, self.top, self.width + widthOffset, self.height + heightOffset, 0) | |
if result == 0: | |
_raiseWithLastError() | |
resizeRel = resize | |
def resizeTo(self, newWidth, newHeight): | |
result = ctypes.windll.user32.SetWindowPos(self._hWnd, HWND_TOP, self.left, self.top, newWidth, newHeight, 0) | |
if result == 0: | |
_raiseWithLastError() | |
def move(self, xOffset, yOffset): | |
result = ctypes.windll.user32.SetWindowPos(self._hWnd, HWND_TOP, self.left + xOffset, self.top + yOffset, self.width, self.height, 0) | |
if result == 0: | |
_raiseWithLastError() | |
moveRel = move | |
def moveTo(self, newLeft, newTop): | |
result = ctypes.windll.user32.SetWindowPos(self._hWnd, HWND_TOP, newLeft, newTop, self.width, self.height, 0) | |
if result == 0: | |
_raiseWithLastError() | |
@property | |
def isMinimized(self): | |
return ctypes.windll.user32.IsIconic(self._hWnd) != 0 | |
@property | |
def isMaximized(self): | |
return ctypes.windll.user32.IsZoomed(self._hWnd) != 0 | |
@property | |
def isActive(self): | |
return getActiveWindow() == self | |
@property | |
def title(self): | |
textLenInCharacters = ctypes.windll.user32.GetWindowTextLengthW(self._hWnd) | |
stringBuffer = ctypes.create_unicode_buffer(textLenInCharacters + 1) | |
ctypes.windll.user32.GetWindowTextW(self._hWnd, stringBuffer, textLenInCharacters + 1) | |
return stringBuffer.value | |
@property | |
def visible(self): | |
return isWindowVisible(self._hWnd) | |
def on_press(key): | |
try: | |
print('alphanumeric key {0} pressed'.format( | |
key.char)) | |
except AttributeError: | |
print('special key {0} pressed'.format( | |
key)) | |
def on_release(key): | |
print('{0} released'.format( | |
key)) | |
if key == keyboard.Key.esc: | |
# Stop listener | |
return False | |
## LISTENER ABSTRACTION METHODS | |
# listen = Listener( | |
# on_press=on_press, | |
# on_release=on_release) | |
# listen.start() | |
## WINDOW ABSTRACTION METHODS | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment