|
#!/usr/bin/env python3 |
|
# -*- coding: utf-8 -*- |
|
|
|
""" |
|
Kindle Key Finder - Python Edition Wrapper |
|
Replicates the exact DeDRM plugin logic for key extraction and JSON generation |
|
No external dependencies - uses the same methods as the plugin |
|
""" |
|
|
|
import os |
|
import sys |
|
import json |
|
import subprocess |
|
import shutil |
|
import time |
|
import re |
|
from datetime import datetime |
|
|
|
# Configuration Flag - Set to True to hide sensitive info in console output |
|
HIDE_SENSITIVE_INFO = False |
|
SUPPORTS_COLOR = hasattr(sys.stdout, "isatty") and sys.stdout.isatty() |
|
KEY_TXT = "kindlekey.txt" |
|
KEY_K4I = "kindlekey.k4i" |
|
KEY_EXTRACTOR = "KFXKeyExtractor28.exe" |
|
|
|
|
|
def print_colored(message, color): |
|
"""Print colored messages""" |
|
colors = { |
|
"cyan": "\033[96m", |
|
"green": "\033[92m", |
|
"yellow": "\033[93m", |
|
"red": "\033[91m", |
|
"magenta": "\033[95m", |
|
"end": "\033[0m", |
|
} |
|
|
|
prefix = colors.get(color, "") if SUPPORTS_COLOR else "" |
|
suffix = colors["end"] if SUPPORTS_COLOR else "" |
|
print(f"{prefix}{message}{suffix}") |
|
|
|
|
|
def print_sep(color="end"): |
|
print_colored("--------------------------------------------------", color) |
|
|
|
|
|
def print_step(message): |
|
print_colored(f"[*] {message}", "cyan") |
|
|
|
|
|
def print_title(message): |
|
print_step(message) |
|
|
|
def print_phase(desc): |
|
if not hasattr(print_phase, "current_phase"): |
|
print_phase.current_phase = 1 |
|
print() |
|
print_step(f"[PHASE {print_phase.current_phase}] {desc}") |
|
print_sep("cyan") |
|
print() |
|
print_phase.current_phase +=1 |
|
|
|
|
|
def print_ok(message): |
|
print_colored(f"[OK] {message}", "green") |
|
|
|
|
|
def print_warn(message): |
|
print_colored(f"[!] {message}", "yellow") |
|
|
|
|
|
def print_error(message): |
|
print_colored(f"[ERROR] {message}", "red") |
|
return 1 |
|
|
|
|
|
def print_done(message): |
|
print_colored(f"[DONE] {message}", "magenta") |
|
|
|
|
|
def obfuscate_sensitive(text): |
|
""" |
|
Obfuscate sensitive strings. |
|
""" |
|
return "*" * len(text) |
|
|
|
|
|
def filter_sensitive_output(text): |
|
""" |
|
Filter and obfuscate sensitive information in output text if HIDE_SENSITIVE_INFO is enabled |
|
Also always suppresses harmless Qt and Fontconfig error messages |
|
""" |
|
lines = text.split("\n") |
|
filtered_lines = [] |
|
|
|
for line in lines: |
|
# Always skip Qt and Fontconfig error messages (regardless of HIDE_SENSITIVE_INFO) |
|
if ( |
|
"QObject::startTimer: Timers can only be used with threads started with QThread" |
|
in line |
|
): |
|
continue |
|
if "Fontconfig error: Cannot load default config file" in line: |
|
continue |
|
|
|
# If obfuscation is disabled, keep the line as-is |
|
if not HIDE_SENSITIVE_INFO: |
|
filtered_lines.append(line) |
|
continue |
|
|
|
filtered_line = line |
|
|
|
# Obfuscate DSN values |
|
if line.startswith("DSN "): |
|
dsn_value = line.replace("DSN ", "").strip() |
|
if dsn_value: |
|
filtered_line = f"DSN {obfuscate_sensitive(dsn_value)}" |
|
|
|
# Obfuscate Tokens |
|
elif line.startswith("Tokens "): |
|
tokens_value = line.replace("Tokens ", "").strip() |
|
if tokens_value: |
|
# Handle comma-separated tokens |
|
if "," in tokens_value: |
|
token_parts = tokens_value.split(",") |
|
obfuscated_parts = [ |
|
obfuscate_sensitive(part.strip()) for part in token_parts |
|
] |
|
filtered_line = f"Tokens {','.join(obfuscated_parts)}" |
|
else: |
|
filtered_line = f"Tokens {obfuscate_sensitive(tokens_value)}" |
|
|
|
# Obfuscate DRM key with UUID and secret key |
|
elif "amzn1.drm-key.v1." in line and "$secret_key:" in line: |
|
# Pattern: amzn1.drm-key.v1.UUID$secret_key:SECRET |
|
match = re.search( |
|
r"(amzn1\.drm-key\.v1\.)([a-f0-9\-]+)(\$secret_key:)([a-f0-9]+)", line |
|
) |
|
if match: |
|
prefix = match.group(1) # amzn1.drm-key.v1. |
|
uuid = match.group(2) # UUID |
|
middle = match.group(3) # $secret_key: |
|
secret = match.group(4) # secret value |
|
|
|
obfuscated_uuid = obfuscate_sensitive(uuid) |
|
obfuscated_secret = obfuscate_sensitive(secret) |
|
|
|
original = f"{prefix}{uuid}{middle}{secret}" |
|
replacement = f"{prefix}{obfuscated_uuid}{middle}{obfuscated_secret}" |
|
filtered_line = line.replace(original, replacement) |
|
|
|
# Obfuscate secret keys (fallback for lines without UUID) |
|
elif "$secret_key:" in line: |
|
parts = line.split("$secret_key:") |
|
if len(parts) == 2: |
|
prefix = parts[0] |
|
secret = parts[1].strip() |
|
if secret: |
|
filtered_line = f"{prefix}$secret_key:{obfuscate_sensitive(secret)}" |
|
|
|
# Obfuscate "Opened book with secret:" (handles both hex and base64) |
|
elif "Opened book with secret:" in line: |
|
match = re.search(r"(Opened book with secret:\s*)([A-Za-z0-9+/=]+)", line) |
|
if match: |
|
prefix = match.group(1) |
|
secret = match.group(2) |
|
obfuscated = obfuscate_sensitive(secret) |
|
filtered_line = line.replace( |
|
f"{prefix}{secret}", f"{prefix}{obfuscated}" |
|
) |
|
|
|
# Obfuscate "Opened book with reused secret:" (handles both hex and base64) |
|
elif "Opened book with reused secret:" in line: |
|
match = re.search( |
|
r"(Opened book with reused secret:\s*)([A-Za-z0-9+/=]+)", line |
|
) |
|
if match: |
|
prefix = match.group(1) |
|
secret = match.group(2) |
|
obfuscated = obfuscate_sensitive(secret) |
|
filtered_line = line.replace( |
|
f"{prefix}{secret}", f"{prefix}{obfuscated}" |
|
) |
|
|
|
# Obfuscate "Working secret:" (handles both hex and base64) |
|
elif "Working secret:" in line: |
|
match = re.search(r'(Working secret:\s*")([A-Za-z0-9+/=]+)(")', line) |
|
if match: |
|
prefix = match.group(1) |
|
secret = match.group(2) |
|
suffix = match.group(3) |
|
obfuscated = obfuscate_sensitive(secret) |
|
filtered_line = line.replace( |
|
f"{prefix}{secret}{suffix}", f"{prefix}{obfuscated}{suffix}" |
|
) |
|
|
|
# Obfuscate device_serial_number in JSON format |
|
elif '"device_serial_number":"' in line: |
|
match = re.search(r'"device_serial_number":"([^"]+)"', line) |
|
if match: |
|
serial = match.group(1) |
|
obfuscated = obfuscate_sensitive(serial) |
|
filtered_line = line.replace(f'"{serial}"', f'"{obfuscated}"') |
|
|
|
filtered_lines.append(filtered_line) |
|
|
|
return "\n".join(filtered_lines) |
|
|
|
|
|
def get_kindle_content_path(content_path): |
|
""" |
|
Prompt user to provide Kindle content directory path if given path is missing |
|
Handles Windows-specific path scenarios |
|
""" |
|
print_step("Kindle-4-PC Book's Path Configuration") |
|
|
|
# Validate path exists |
|
prompt = "(Paths with spaces are supported, quotation marks will be removed)\n> " |
|
while not os.path.exists(content_path): |
|
print_error(f"Path does not exist: {content_path}") |
|
print() |
|
user_input = input("prompt").strip() |
|
prompt = "> " |
|
content_path = user_input.strip('"').strip("'") |
|
content_path = os.path.expandvars(content_path) |
|
content_path = os.path.normpath(content_path) |
|
print_ok(f"Using path: {content_path}") |
|
|
|
return content_path |
|
|
|
|
|
def extract_keys_using_extractor(extractor_path, content_dir, output_key, output_k4i): |
|
"""Extract keys using the KEY_EXTRACTOR and parse output for missing data""" |
|
# Use current user instead of hardcoded "Admin" |
|
user_home = os.path.expanduser("~") |
|
kindle_dir = os.path.join( |
|
user_home, "AppData", "Local", "Amazon", "Kindle", "application" |
|
) |
|
extractor_in_kindle = os.path.join(kindle_dir, KEY_EXTRACTOR) |
|
|
|
try: |
|
# Copy extractor to Kindle folder |
|
shutil.copy2(extractor_path, kindle_dir) |
|
print_ok("Extractor copied to Kindle folder.") |
|
|
|
# Run extractor and capture output |
|
print_step("Running key extractor...") |
|
result = subprocess.run( |
|
[extractor_in_kindle, content_dir, output_key, output_k4i], |
|
capture_output=True, |
|
text=True, |
|
) |
|
|
|
# Print the output for user visibility (with optional filtering) |
|
if result.stdout: |
|
filtered_stdout = filter_sensitive_output(result.stdout) |
|
print(filtered_stdout) |
|
if result.stderr: |
|
filtered_stderr = filter_sensitive_output(result.stderr) |
|
print(filtered_stderr) |
|
|
|
# Parse output for DSN and tokens |
|
dsn = None |
|
tokens = None |
|
|
|
if result.stdout: |
|
lines = result.stdout.split("\n") |
|
for line in lines: |
|
if line.startswith("DSN "): |
|
dsn = line.replace("DSN ", "").strip() |
|
elif line.startswith("Tokens "): |
|
tokens_line = line.replace("Tokens ", "").strip() |
|
# Extract the first token (before comma) |
|
if "," in tokens_line: |
|
tokens = tokens_line.split(",")[0].strip() |
|
else: |
|
tokens = tokens_line |
|
|
|
# Cleanup |
|
if os.path.exists(extractor_in_kindle): |
|
os.remove(extractor_in_kindle) |
|
print_ok("Extractor cleaned up.") |
|
|
|
success = os.path.exists(output_key) and os.path.exists(output_k4i) |
|
return success, dsn, tokens |
|
|
|
except Exception as e: |
|
print_error(f"Extractor method failed: {e}") |
|
# Cleanup on error |
|
if os.path.exists(extractor_in_kindle): |
|
os.remove(extractor_in_kindle) |
|
return False, None, None |
|
|
|
|
|
def create_kindle_key_from_k4i(k4i_path, dsn=None, tokens=None): |
|
""" |
|
Create a Kindle key entry exactly like the DeDRM plugin does |
|
This replicates the logic from kindlekey.py's kindlekeys() function |
|
Handles missing fields gracefully with extracted or default values |
|
""" |
|
try: |
|
with open(k4i_path, "r") as f: |
|
k4i_data = json.load(f) |
|
|
|
# Handle missing fields by providing defaults or extracted values |
|
kindle_key = { |
|
"DSN": k4i_data.get("DSN", dsn or ""), |
|
"kindle.account.clear_old_secrets": k4i_data.get( |
|
"kindle.account.clear_old_secrets", [] |
|
), |
|
"kindle.account.new_secrets": k4i_data.get( |
|
"kindle.account.new_secrets", [] |
|
), |
|
"kindle.account.secrets": k4i_data.get("kindle.account.secrets", []), |
|
"kindle.account.tokens": k4i_data.get( |
|
"kindle.account.tokens", tokens or "" |
|
), |
|
} |
|
|
|
return kindle_key |
|
|
|
except Exception as e: |
|
print_error(f"Failed to process k4i file: {e}") |
|
return None |
|
|
|
|
|
def create_dedrm_config(kindle_key, kindlekey_txt_path, reference_json_path=None): |
|
""" |
|
Create the dedrm.json configuration exactly like the plugin does |
|
""" |
|
|
|
if reference_json_path and os.path.exists(reference_json_path): |
|
# Use reference file as template |
|
print_step("Using reference file as template...") |
|
with open(reference_json_path, "r") as f: |
|
dedrm_config = json.load(f) |
|
else: |
|
# Create new structure matching plugin's default |
|
print_step("Creating new configuration structure...") |
|
dedrm_config = { |
|
"adeptkeys": {}, |
|
"adobe_pdf_passphrases": [], |
|
"adobewineprefix": "", |
|
"androidkeys": {}, |
|
"bandnkeys": {}, |
|
"configured": True, |
|
"deobfuscate_fonts": True, |
|
"ereaderkeys": {}, |
|
"kindleextrakeyfile": "", |
|
"kindlekeys": {}, |
|
"kindlewineprefix": "", |
|
"lcp_passphrases": [], |
|
"pids": [], |
|
"remove_watermarks": False, |
|
"serials": [], |
|
} |
|
|
|
# Set the extra key file path (with proper escaping for JSON) |
|
dedrm_config["kindleextrakeyfile"] = kindlekey_txt_path |
|
|
|
# Add the kindle key exactly like the plugin does |
|
# The plugin uses the key name "kindlekey" + count, but for single key we use "kindlekey" |
|
dedrm_config["kindlekeys"]["kindlekey"] = kindle_key |
|
|
|
return dedrm_config |
|
|
|
|
|
def remove_old_file(file_path): |
|
if os.path.exists(file_path): |
|
os.remove(file_path) |
|
print_ok(f"Previous {os.path.basename(file_path)} deleted.") |
|
else: |
|
print_warn(f"No existing {os.path.basename(file_path)} found.") |
|
|
|
|
|
def main(): |
|
# Define paths - use script directory and current user instead of hardcoded paths |
|
script_dir = os.path.dirname(os.path.abspath(__file__)) |
|
fixed_dir = script_dir |
|
user_home = os.path.expanduser("~") |
|
extractor = os.path.join(fixed_dir, KEY_EXTRACTOR) |
|
output_key = os.path.join(fixed_dir, KEY_TXT) |
|
output_k4i = os.path.join(fixed_dir, KEY_K4I) |
|
dedrm_json = os.path.join( |
|
user_home, "AppData", "Roaming", "calibre", "plugins", "dedrm.json" |
|
) |
|
reference_json = os.path.join(fixed_dir, "dedrm filled.json") |
|
|
|
# Create backup filename with timestamp in backups folder |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
backups_dir = os.path.join(fixed_dir, "backups") |
|
os.makedirs(backups_dir, exist_ok=True) |
|
backup_json = os.path.join(backups_dir, f"dedrm_backup_{timestamp}.json") |
|
|
|
os.system("cls") # Clear screen |
|
print_title("Kindle Key Finder - Python Edition Wrapper") |
|
print_sep() |
|
print("Phase 1: Key Extraction (Plugin-Compatible)") |
|
print("Phase 2: DeDRM Plugin Auto-Configuration") |
|
print("Phase 3: Final Verification") |
|
print_sep() |
|
print() |
|
|
|
try: |
|
# prompts user if default not found |
|
content_dir = get_kindle_content_path( |
|
os.path.join(user_home, "Documents", "My Kindle Content") |
|
) |
|
|
|
print_phase("Preparing Kindle Key extraction...") |
|
|
|
print_step("Remove Previous Keys...") |
|
remove_old_file(output_key) |
|
remove_old_file(output_k4i) |
|
|
|
print_step("Extracting Kindle keys...") |
|
success, dsn, tokens = extract_keys_using_extractor( |
|
extractor, content_dir, output_key, output_k4i |
|
) |
|
if not success: |
|
return print_error( |
|
f"Key extraction failed! Both {KEY_TXT} and {KEY_K4I} are required." |
|
) |
|
print_ok("Keys successfully extracted:") |
|
print(f" - {output_key}") |
|
print(f" - {output_k4i}") |
|
|
|
print_phase("Updating DeDRM plugin configuration...") |
|
if os.path.exists(dedrm_json): |
|
print_step("Creating backup of existing dedrm.json...") |
|
shutil.copy2(dedrm_json, backup_json) |
|
print_ok(f"Backup created: {backup_json}") |
|
else: |
|
print_warn("No existing dedrm.json found - will create new one.") |
|
|
|
print_step("Processing kindlekey.k4i...") |
|
kindle_key = create_kindle_key_from_k4i(output_k4i, dsn, tokens) |
|
if not kindle_key: |
|
return print_error("Failed to process k4i file!") |
|
print_ok("Kindle key data processed successfully.") |
|
|
|
print_step("Creating DeDRM configuration...") |
|
dedrm_config = create_dedrm_config(kindle_key, output_key, reference_json) |
|
|
|
print_step("Writing dedrm.json with exact plugin formatting...") |
|
os.makedirs(os.path.dirname(dedrm_json), exist_ok=True) |
|
with open(dedrm_json, "w") as f: |
|
json.dump(dedrm_config, f, indent=2) |
|
print_ok("DeDRM configuration updated successfully!") |
|
print_ok(f"Updated key: {kindle_key}") |
|
print_ok(f"Set extra key file: {output_key}") |
|
|
|
print_phase("Verifying configuration...") |
|
with open(dedrm_json, "r") as f: |
|
dedrm_verify = json.load(f) |
|
key_count = len(dedrm_verify.get("kindlekeys", {})) |
|
print_ok(f"Total Kindle keys in config: {key_count}") |
|
print_ok( |
|
f"Extra key file path: {dedrm_verify.get('kindleextrakeyfile', 'Not set')}" |
|
) |
|
|
|
print_phase("Key Details") |
|
if "kindlekeys" in dedrm_verify and "kindlekey" in dedrm_verify["kindlekeys"]: |
|
key_data = dedrm_verify["kindlekeys"]["kindlekey"] |
|
dsn_value = key_data.get("DSN", "Not found") |
|
if HIDE_SENSITIVE_INFO and dsn_value != "Not found": |
|
dsn_value = obfuscate_sensitive(dsn_value) |
|
print_ok(f"DSN: {dsn_value}") |
|
print_ok( |
|
f"Account secrets count: {len(key_data.get('kindle.account.secrets', []))}" |
|
) |
|
print_ok( |
|
f"New secrets count: {len(key_data.get('kindle.account.new_secrets', []))}" |
|
) |
|
|
|
print_phase("SUCCESS! Complete automation finished!") |
|
print_ok("What was accomplished:") |
|
print(" + Extracted Kindle keys using plugin-compatible method") |
|
print(" + Generated kindlekey.txt (voucher keys)") |
|
print(" + Generated kindlekey.k4i (account data)") |
|
print(" + Updated DeDRM plugin configuration automatically") |
|
print(" + Used exact same JSON generation as the plugin") |
|
print(" + Set extra key file path in plugin") |
|
print(" + Added account keys to plugin database") |
|
print(" + Created configuration backup for safety") |
|
print() |
|
print_ok("You can now start Calibre and import your books!") |
|
print_ok("The plugin should accept this configuration without issues.") |
|
print() |
|
|
|
except Exception as e: |
|
import traceback |
|
|
|
print_error(f"Script failed: {str(e)}") |
|
print_error(f"Traceback: {traceback.format_exc()}") |
|
if os.path.exists(backup_json) and os.path.exists(dedrm_json): |
|
print_warn("Restoring backup...") |
|
shutil.copy2(backup_json, dedrm_json) |
|
print_ok("Backup restored.") |
|
raise |
|
|
|
|
|
if __name__ == "__main__": |
|
sys.exit(main()) |