|
print('__file__={0:<35} | __name__={1:<25} | __package__={2:<25}'.format(__file__,__name__,str(__package__))) |
|
import ctypes |
|
import win32con |
|
import win32api |
|
import win32gui |
|
import atexit |
|
from collections import namedtuple |
|
import threading |
|
import queue |
|
import time |
|
|
|
class InputListener: |
|
"""https://gist.github.com/Amgarak/5df8477bad67dabbc491322e74ce1c2c |
|
|
|
This class is designed for listening to keyboard and mouse events in a Windows environment. |
|
It utilizes low-level hooks to capture events such as key presses, |
|
key releases, mouse movements, and mouse button clicks. |
|
The class provides functionalities to register event handlers and filters, |
|
allowing custom processing of events. |
|
It also supports the control of threads for concurrent event handling. |
|
|
|
""" |
|
def __init__(self): |
|
self.flagRun=False |
|
self.signal = queue.Queue() |
|
|
|
self._key_down = set() |
|
|
|
self.if_handlers = set() |
|
self.handlers = set() |
|
|
|
self.listener_queues = {} |
|
self.listeners =[] |
|
|
|
self.variable_key_down = {} |
|
|
|
self.event_types = { |
|
win32con.WM_KEYDOWN: 'key_down', # 0x100: 'key down', - WM_KeyDown for normal keys - 256 |
|
win32con.WM_KEYUP: 'key_up', # 0x101: 'key up', - WM_KeyUp for normal keys - 257 |
|
0x104: 'key_down', # WM_SYSKEYDOWN, used for Alt key - 260 |
|
0x105: 'key_up'} # WM_SYSKEYUP, used for Alt key - 261 |
|
|
|
self.mouse_types={ |
|
0x200: 'move', # WM_MOUSEMOVE - 512 |
|
0x20A: 'wheel', # WM_MOUSEWHEEL - 522 - scan_code_top: 7864320; scan_code_bot: 4287102976 |
|
0x20E: 'H_wheel', # WM_MOUSEHWHEEL - 526 |
|
0x204: 'key_down', # WM_RBUTTONDOWN - 516 |
|
0x205: 'key_up', # WM_RBUTTONUP - 517 |
|
0x201: 'key_down', # WM_LBUTTONDOWN - 513 |
|
0x202: 'key_up', # WM_LBUTTONUP - 514 |
|
0x207: 'key_down', # WM_MBUTTONDOWN - 519 |
|
0x208: 'key_up', # WM_MBUTTONUP - 520 |
|
0x20B: 'key_down', # WM_XBUTTONDOWN - 523 - scan_code: 131072; scan_code: 65536 |
|
0x20C: 'key_up'} # WM_XBUTTONUP - 524 - scan_code: 131072; scan_code: 65536 |
|
|
|
self.mouse_key={ |
|
0x200: 'move', |
|
0x20A: 'wheel', |
|
7864320: 'wheel_top', |
|
4287102976: 'wheel_bot', |
|
0x20E: 'H_wheel', |
|
0x204: 'right', |
|
0x205: 'right', |
|
0x201: 'left', |
|
0x202: 'left', |
|
0x207: 'middle', |
|
0x208: 'middle', |
|
131072: 'X_Button_1', |
|
65536: 'X_Button_2'} |
|
|
|
self.DeviceEvent = namedtuple('DeviceEvent', ['inputDevice', 'event_type', 'key_code', 'x', 'y', 'scan_code', 'key_down']) |
|
self.DeviceEvent.__new__.__defaults__ = (None, None, None, None, None, None, None) |
|
|
|
self.default_key = "Value not defined" |
|
|
|
def key_event(self, event): |
|
while not self.signal.empty(): |
|
self.signal_handler(self.signal.get()) |
|
|
|
for listener_queue in self.listener_queues.values(): |
|
listener_queue.put(event) |
|
|
|
def key_down(self, event_type, key_code): |
|
key = str(key_code) |
|
|
|
if key not in self._key_down and event_type == 'key_down': |
|
self._key_down.add(key) |
|
return True |
|
|
|
elif key in self._key_down and event_type == 'key_up': |
|
self._key_down.remove(key) |
|
return True |
|
|
|
def if_block(self, event): |
|
for if_handler in self.if_handlers: |
|
result = if_handler(event) |
|
if result is not None and not result: |
|
return False |
|
return True |
|
|
|
def keyboard_low_level_handler(self, nCode, wParam, lParam): |
|
lParam = ctypes.cast(lParam, ctypes.POINTER(ctypes.c_ulong)).contents.value |
|
|
|
rezult = self.key_down(self.event_types[wParam], lParam) |
|
event = self.DeviceEvent('Keyboard', self.event_types[wParam], lParam, key_down=self._key_down) |
|
|
|
if rezult: # !!! - Отсекаем повторяющийся ивент key_down |
|
self.key_event(event) |
|
|
|
if self.if_block(event): |
|
return ctypes.windll.user32.CallNextHookEx(self.keyboard_hook_id, nCode, wParam, lParam) |
|
else: |
|
return -1 |
|
|
|
def mouse_low_level_handler(self, nCode, wParam, lParam): |
|
point = ctypes.cast(lParam, ctypes.POINTER(ctypes.c_long * 2)).contents |
|
|
|
x = point[0] |
|
y = point[1] |
|
|
|
castomParam = self.mouse_key.get(lParam[1], self.default_key) if lParam[1] is not None else self.mouse_key[wParam] |
|
self.key_down(self.mouse_types[wParam], castomParam) |
|
|
|
event = self.DeviceEvent('Mouse', self.mouse_types[wParam], castomParam, x, y, lParam[1], self._key_down) |
|
self.key_event(event) |
|
|
|
if self.if_block(event): |
|
return ctypes.windll.user32.CallNextHookEx(self.mouse_hook_id, nCode, wParam, lParam) |
|
else: |
|
return -1 |
|
|
|
def listen(self): |
|
CMPFUNC = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_void_p)) |
|
ctypes.windll.user32.SetWindowsHookExW.argtypes = ( |
|
ctypes.c_int, |
|
CMPFUNC, |
|
ctypes.c_void_p, |
|
ctypes.c_uint |
|
) |
|
|
|
keyboard_pointer = CMPFUNC(self.keyboard_low_level_handler) |
|
self.keyboard_hook_id = ctypes.windll.user32.SetWindowsHookExW(win32con.WH_KEYBOARD_LL, keyboard_pointer, |
|
win32api.GetModuleHandle(None), 0) |
|
|
|
mouse_pointer = CMPFUNC(self.mouse_low_level_handler) |
|
self.mouse_hook_id = ctypes.windll.user32.SetWindowsHookExW(win32con.WH_MOUSE_LL, mouse_pointer, |
|
win32api.GetModuleHandle(None), 0) |
|
|
|
atexit.register(ctypes.windll.user32.UnhookWindowsHookEx, self.keyboard_hook_id) |
|
atexit.register(ctypes.windll.user32.UnhookWindowsHookEx, self.mouse_hook_id) |
|
|
|
self.flagRun = True |
|
win32gui.GetMessage(None, 0, 0) |
|
print(f"Hooks завершил свою работу..") |
|
|
|
def create_listener_queues(self): |
|
self.listener_queues = {func.__name__: queue.Queue() for func in self.handlers} |
|
|
|
def create_listener(self): |
|
self.listeners = [ |
|
threading.Thread(target=self.wrapper, args=(self.listener_queues[func.__name__], func), name=func.__name__) |
|
for func in self.handlers |
|
] |
|
|
|
def start_listener(self): |
|
for listener_thread in self.listeners: |
|
listener_thread.daemon = True |
|
listener_thread.start() |
|
|
|
def wrapper(self, listener_queue, func): |
|
self.variable_key_down[func.__name__] = set() |
|
while True: |
|
event = listener_queue.get() |
|
if event is None: |
|
break |
|
|
|
if func.__name__ in self.variable_key_down: |
|
self.variable_key_down[func.__name__].clear() |
|
self.variable_key_down[func.__name__].update(event.key_down) |
|
event = event._replace(key_down=self.variable_key_down.get(func.__name__, None)) |
|
|
|
func(event) |
|
|
|
print(f"Демон {func.__name__} завершил свою работу..") |
|
|
|
def start(self, use_daemon = False): |
|
self.create_listener_queues() |
|
self.create_listener() |
|
self.start_listener() |
|
|
|
# Создаем новый поток и передаем в него функцию self.listen() - без скобок! |
|
# Мы передаем саму функцию в качестве объекта. В этом случае, функция не вызывается немедленно. |
|
thread = threading.Thread(target=self.listen, name="Hooks") |
|
thread.daemon = use_daemon # Включить при использовании GUI |
|
thread.start() # Запускаем Hooks в отдельном потоке |
|
self.thread_id = thread.native_id # Получаем дескриптор потока, используя атрибут native_id объекта threading.Thread |
|
|
|
def stop(self): # Останавливаем слушатели\хуки |
|
start_time = time.time() |
|
while (time.time() - start_time) < 20: |
|
if self.flagRun: |
|
break |
|
time.sleep(1) |
|
else: |
|
return False |
|
|
|
user32 = ctypes.windll.user32 |
|
WM_QUIT = 0x0012 # Используя дискриптор потока, отправляем сообщение WM_QUIT |
|
user32.PostThreadMessageW(self.thread_id, WM_QUIT, 0, 0) # Для остановки win32gui.GetMessage(None, 0, 0) |
|
|
|
ctypes.windll.user32.UnhookWindowsHookEx(self.keyboard_hook_id) |
|
ctypes.windll.user32.UnhookWindowsHookEx(self.mouse_hook_id) |
|
|
|
for listener_queue in self.listener_queues.values(): |
|
listener_queue.put(None) |
|
|
|
for listener_thread in self.listeners: |
|
listener_thread.join() |
|
|
|
self.listener_queues.clear() |
|
self.listeners.clear() |
|
self._key_down.clear() |
|
self.variable_key_down.clear() |
|
|
|
self.flagRun = False |
|
|
|
def signal_handler(self, handler_dict): |
|
match handler_dict: |
|
case {"hot_removal_handler": func}: |
|
|
|
self.handlers.discard(func) |
|
stop_signal_queue = self.listener_queues.get(func.__name__) |
|
stop_signal_queue.put(None) |
|
self.variable_key_down.pop(func.__name__, None) |
|
self.listener_queues.pop(func.__name__, None) |
|
print(f"Handling hot removal for function {func.__name__}") |
|
|
|
case {"hot_plugging_handler": func}: |
|
|
|
self.handlers.add(func) |
|
self.listener_queues.update({func.__name__: queue.Queue()}) |
|
listener_thread=threading.Thread(target=self.wrapper, args=(self.listener_queues[func.__name__], func), name=func.__name__) |
|
self.listeners.append(listener_thread) |
|
listener_thread.daemon = True |
|
listener_thread.start() |
|
print(f"Handling hot plugging for function {func.__name__}") |
|
|
|
case {"hot_removal_if_handler": func}: |
|
|
|
self.if_handlers.discard(func) |
|
print(f"Handling conditional hot removal for function {func.__name__}") |
|
|
|
case {"hot_plugging_if_handler": func}: |
|
|
|
self.if_handlers.add(func) |
|
print(f"Handling conditional hot plugging for function {func.__name__}") |
|
|
|
case _: |
|
print("Unknown case") |
|
|
|
def get_if_handlers(self, objekt=None): |
|
if objekt == "__name__": |
|
return set(if_handler.__name__ for if_handler in self.if_handlers) |
|
else: |
|
return self.if_handlers |
|
|
|
def get_handlers(self, objekt=None): |
|
if objekt == "__name__": |
|
return set(handler.__name__ for handler in self.handlers) |
|
else: |
|
return self.handlers |
|
|
|
def get_status_hooks(self): |
|
return self.flagRun |
|
|
|
def hot_removal_handler(self, func): # Горячее удаление колбэк-обработчиков |
|
signal = {"hot_removal_handler": func} |
|
self.signal.put(signal) |
|
|
|
def hot_plugging_handler(self, func):# Горячая регестрирация колбэк-обработчиков |
|
signal = {"hot_plugging_handler": func} |
|
self.signal.put(signal) |
|
|
|
def hot_removal_if_handler(self, func): # Горячее удаление колбэк-фильтров |
|
signal = {"hot_removal_if_handler": func} |
|
self.signal.put(signal) |
|
|
|
def hot_plugging_if_handler(self, func): # Горячая регестрирация колбэк-фильтров |
|
signal = {"hot_plugging_if_handler": func} |
|
self.signal.put(signal) |
|
|
|
def add_if_handler(self, func): # Регестрируем колбэк-фильтров |
|
self.if_handlers.add(func) |
|
|
|
def add_handler(self, func): # Регестрируем колбэк-обработчиков |
|
self.handlers.add(func) |
|
|
|
def remove_if_handler(self, func): # Удаление колбэк-фильтров |
|
self.if_handlers.discard(func) |
|
|
|
def remove_handler(self, func): # Удаление колбэк-обработчиков |
|
self.handlers.discard(func) |
|
self.variable_key_down.pop(func.__name__, None) |
|
|
|
def __call__(self): |
|
def decorator(func_handler): |
|
def wrapper(event): # Обертка для обработки параметра функции |
|
if event == True: |
|
signal = {"hot_plugging_handler": func_handler} # Горячая регестрирация колбэк-обработчиков |
|
self.signal.put(signal) |
|
return True |
|
|
|
elif event == False: |
|
signal = {"hot_removal_handler": func_handler} # Горячее удаление колбэк-обработчиков |
|
self.signal.put(signal) |
|
return func_handler |
|
|
|
else: |
|
return False |
|
|
|
self.handlers.add(func_handler) # Добавляем обработчик в список |
|
return wrapper # Возвращаем обернутую функцию-обработчик |
|
return decorator |
|
|
|
def add_filter(self): |
|
def decorator(func_if_handler): |
|
def wrapper(event): # Обертка для обработки параметра функции |
|
if event == True: |
|
signal = {"hot_plugging_if_handler": func_if_handler} # Горячая регестрирация колбэк-фильтров |
|
self.signal.put(signal) |
|
return True |
|
|
|
elif event == False: |
|
signal = {"hot_removal_if_handler": func_if_handler} # Горячее удаление колбэк-фильтров |
|
self.signal.put(signal) |
|
return func_if_handler |
|
|
|
else: |
|
return False |
|
|
|
self.if_handlers.add(func_if_handler) # Добавляем обработчик в список |
|
return wrapper # Возвращаем обернутую функцию-обработчик |
|
return decorator |
|
|
|
if __name__ == '__main__': |
|
input_listener = InputListener() |
|
|
|
log = True |
|
|
|
@input_listener() |
|
def print_event(event): |
|
|
|
if log: |
|
print(f"!-----------: \n" |
|
f"{time.strftime('%H:%M:%S')} -> {event.inputDevice} -> {event.event_type} -> {event.key_code}\n" |
|
f"X: {event.x}, Y: {event.y}, Scan_code: {event.scan_code}\n" |
|
f"____________! \n") |
|
|
|
@input_listener() |
|
def print_event2(event): |
|
|
|
if log: |
|
print(f"!-----------: \n" |
|
f"Нажатые кнопки: {event.key_down}\n" |
|
f"____________! \n") |
|
|
|
@input_listener() |
|
def print_event3(event): |
|
if log: |
|
threads = threading.enumerate() |
|
thread_names = [thread.name for thread in threads] |
|
threads_str = ', '.join(thread_names) |
|
print(f"!-----------: \n" |
|
f"Активные потоки: {threads_str}\n" |
|
f"____________! \n") |
|
|
|
@input_listener.add_filter() |
|
def _if_block(event): |
|
if event.key_code == 49: |
|
print(f"Блокируем кнопку клавиатуры [1]") |
|
return False |
|
|
|
@input_listener.add_filter() |
|
def _if2_block(event): |
|
if event.key_code == "left": |
|
print(f"Блокируем кнопку мыши [left]") |
|
return False |
|
|
|
@input_listener.add_filter() |
|
def _if3_block(event): |
|
# Проверка условия ограничения координат мыши |
|
if event.inputDevice == "Mouse" and 0 <= event.y <= 250: |
|
# Блокировка движения мыши |
|
win32api.SetCursorPos((event.x, 250)) |
|
return False # Блокировать обработку события мыши |
|
|
|
if event.inputDevice == "Mouse" and 1000 <= event.y <= 1080: |
|
win32api.SetCursorPos((event.x, 1000)) |
|
return False |
|
|
|
|
|
input_listener.start() |
|
print(input_listener.get_status_hooks()) |
|
time.sleep(5) |
|
_if_block(False) |
|
_if2_block(False) |
|
_if3_block(False) |
|
print(input_listener.get_handlers("__name__")) |
|
print(input_listener.get_handlers()) |
|
|
|
print(input_listener.get_if_handlers("__name__")) |
|
print(input_listener.get_if_handlers()) |
|
print_event(False) |
|
print_event2(False) |
|
print_event3(False) |
|
time.sleep(5) |
|
_if_block(True) |
|
_if2_block(True) |
|
_if3_block(True) |
|
|
|
print_event(True) |
|
print_event2(True) |
|
print_event3(True) |
|
input_listener.stop() |
|
print(input_listener.get_status_hooks()) |
|
input_listener.start() |
|
|
|
|
|
"""if __name__ == '__main__': |
|
|
|
log = True |
|
def print_event(event): |
|
|
|
if log: |
|
print(f"!-----------: \n" |
|
f"{time.strftime('%H:%M:%S')} -> {event.inputDevice} -> {event.event_type} -> {event.key_code}\n" |
|
f"X: {event.x}, Y: {event.y}, Scan_code: {event.scan_code}\n" |
|
f"____________! \n") |
|
|
|
def print_event2(event): |
|
|
|
if log: |
|
print(f"!-----------: \n" |
|
f"Нажатые кнопки: {event.key_down}\n" |
|
f"____________! \n") |
|
|
|
def print_event3(event): |
|
if log: |
|
threads = threading.enumerate() |
|
thread_names = [thread.name for thread in threads] |
|
threads_str = ', '.join(thread_names) |
|
print(f"!-----------: \n" |
|
f"Активные потоки: {threads_str}\n" |
|
f"____________! \n") |
|
|
|
def _if_block(event): |
|
if event.key_code == 49: |
|
print(f"Блокируем кнопку клавиатуры [1]") |
|
return False |
|
|
|
def _if2_block(event): |
|
if event.key_code == "left": |
|
print(f"Блокируем кнопку мыши [left]") |
|
return False |
|
|
|
def _if3_block(event): |
|
# Проверка условия ограничения координат мыши |
|
if event.inputDevice == "Mouse" and 0 <= event.y <= 250: |
|
# Блокировка движения мыши |
|
win32api.SetCursorPos((event.x, 250)) |
|
return False # Блокировать обработку события мыши |
|
|
|
if event.inputDevice == "Mouse" and 1000 <= event.y <= 1080: |
|
win32api.SetCursorPos((event.x, 1000)) |
|
return False |
|
|
|
input_listener = InputListener() |
|
print(input_listener.get_status_hooks()) |
|
|
|
input_listener.add_handler(print_event) |
|
input_listener.add_handler(print_event2) |
|
input_listener.add_handler(print_event3) |
|
|
|
input_listener.add_if_handler(_if_block) |
|
input_listener.add_if_handler(_if2_block) |
|
input_listener.add_if_handler(_if3_block) |
|
|
|
input_listener.start() |
|
print(input_listener.get_status_hooks()) |
|
time.sleep(5) |
|
|
|
input_listener.stop() |
|
print(input_listener.get_status_hooks()) |
|
time.sleep(5) |
|
|
|
input_listener.remove_handler(print_event2) # Удаление колбэк-обработчика на холодную |
|
input_listener.remove_if_handler(_if3_block) |
|
|
|
input_listener.start() |
|
print(input_listener.get_status_hooks()) |
|
time.sleep(5) |
|
|
|
input_listener.hot_removal_if_handler(_if_block) |
|
input_listener.hot_removal_if_handler(_if2_block) |
|
|
|
input_listener.hot_plugging_if_handler(_if3_block) |
|
|
|
input_listener.hot_removal_handler(print_event) |
|
|
|
print(input_listener.get_handlers("__name__")) |
|
print(input_listener.get_handlers()) |
|
|
|
print(input_listener.get_if_handlers("__name__")) |
|
print(input_listener.get_if_handlers()) |
|
|
|
input_listener.hot_plugging_handler(print_event2) |
|
input_listener.hot_removal_handler(print_event2) |
|
|
|
time.sleep(5) |
|
|
|
input_listener.hot_plugging_handler(print_event) |
|
input_listener.hot_plugging_handler(print_event2) |
|
input_listener.hot_removal_if_handler(_if3_block)""" |