Last active
December 28, 2023 14:38
-
-
Save xiaoxi-ij478/a36a7a2d99ccb77b3b8c034ad5167947 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import getopt | |
import os | |
import pathlib | |
import sys | |
import zlib | |
from abc import ABCMeta, abstractmethod | |
from typing import ByteString | |
class DataDecryptor(metaclass=ABCMeta): | |
@abstractmethod | |
def encrypt(self, bytestream: ByteString) -> bytes: | |
raise NotImplementedError | |
@abstractmethod | |
def decrypt(self, bytestream: ByteString) -> bytes: | |
raise NotImplementedError | |
class DT01DataDecrypter(DataDecryptor): | |
_DEFAULT_BLOCK_LEN: int = 9 | |
def encrypt(self, bytestream: ByteString) -> bytes: | |
bytestream = memoryview(bytearray(zlib.compress(bytestream))) | |
length: int = len(bytestream) | |
block_len: int = self._DEFAULT_BLOCK_LEN | |
block_count: int = length // block_len | |
if not block_count: | |
block_count = 1 | |
block_len = length | |
block_current_pos: int = 1 | |
block_remaining: int = block_len | |
current_pos: int = 0 | |
while block_current_pos <= block_len and block_remaining >= 1: | |
current_pos = block_count * block_current_pos - 1 | |
bytestream[current_pos] ^= (block_count * block_remaining) & 0xFF | |
block_current_pos += 1 | |
block_remaining -= 1 | |
return bytes(bytestream) | |
def decrypt(self, bytestream: ByteString) -> bytes: | |
bytestream = memoryview(bytearray(bytestream)) | |
length: int = len(bytestream) | |
block_len: int = self._DEFAULT_BLOCK_LEN | |
block_count: int = length // block_len | |
if not block_count: | |
block_count = 1 | |
block_len = length | |
block_current_pos: int = 1 | |
block_remaining: int = block_len | |
current_pos: int = 0 | |
while block_current_pos <= block_len and block_remaining >= 1: | |
current_pos = block_count * block_current_pos - 1 | |
bytestream[current_pos] ^= (block_count * block_remaining) & 0xFF | |
block_current_pos += 1 | |
block_remaining -= 1 | |
return zlib.decompress(bytestream) | |
class DT03DataDecrypter(DataDecryptor): | |
_LEN: int = 77 | |
def _internal(self, bytestream: ByteString, encrypt_length: int) -> bytes: | |
bytestream = memoryview(bytearray(bytestream)) | |
current_pos: int = 0 | |
encrypt_pos: int = 0 | |
while encrypt_length - encrypt_pos >= 2: | |
bytestream[encrypt_pos] ^= (encrypt_pos + 2) & 0xFF | |
bytestream[encrypt_pos + 1] ^= (encrypt_pos + 1) & 0xFF | |
bytestream[encrypt_pos + 2] ^= encrypt_pos & 0xFF | |
current_pos = encrypt_pos + 3 | |
encrypt_pos += self._LEN | |
return bytes(bytestream) | |
def decrypt(self, bytestream: ByteString) -> bytes: | |
return self._internal(bytestream, len(bytestream) - 1) | |
def encrypt(self, bytestream: ByteString) -> bytes: | |
return self._internal(bytestream, len(bytestream)) | |
class UnmatchMagicNumberError(ValueError): | |
pass | |
class EncryptManager: | |
encryptors: dict[int, DataDecryptor] = { | |
1: DT01DataDecrypter, | |
3: DT03DataDecrypter | |
} | |
def encrypt(self, enctype: int, bytestream: ByteString) -> bytes: | |
encryptor = self.encryptors[enctype] | |
bytestream = encryptor().encrypt(bytestream) | |
return bytes( | |
[63, 63, 0, enctype] | |
) + bytestream | |
def decrypt(self, bytestream: ByteString) -> bytes: | |
if bytestream[0:3] != b"\x3F\x3F\x00": | |
raise UnmatchMagicNumberError("Unmatch magic number") | |
print("Using encryption algorithm", bytestream[3], file=sys.stderr) | |
decryptor = self.encryptors[bytestream[3]] | |
return decryptor().decrypt(bytestream[4:]) | |
def print_help(prog_name): | |
print(f"Usage: {prog_name} [OPTIONS]... [FILENAME]", file=sys.stderr) | |
print("Encrypt/Decrypt", file=sys.stderr) | |
print(file=sys.stderr) | |
print("File may be '-' to read from stdin / write to stdout", file=sys.stderr) | |
print("Default is decrypt stdin to stdout", file=sys.stderr) | |
print(file=sys.stderr) | |
print(" -e, --encrypt <ALGORITHM> Encrypt file (1 for algo 1, 3 for algo 3)", file=sys.stderr) | |
print(" -d, --decrypt Decrypt file (default)", file=sys.stderr) | |
print(" -o, --output <FILENAME> Write output to <FILENAME> (default stdout)", file=sys.stderr) | |
print(" (throw an error if <FILENAME> exists", file=sys.stderr) | |
print(" and `-r' is not specified)", file=sys.stderr) | |
print(" -f, --dir <DIRECTORY> Put the output file at <DIRECTORY>", file=sys.stderr) | |
print(" (cannot use with `-r')", file=sys.stderr) | |
print(" -r, --replace Without `-o', replace the input file", file=sys.stderr) | |
print(" With `-o', replace the specified output file", file=sys.stderr) | |
print(" -h, --help Display this help", file=sys.stderr) | |
def main(argc, argv): | |
command_line = getopt.gnu_getopt( | |
argv[1:], "e:do:rf:h", | |
["encrypt=", "decrypt", "output=", "replace", "dir=", "help"] | |
) | |
input_file = sys.stdin.buffer | |
output_file = '-' | |
exec_function = EncryptManager().decrypt | |
replace = False | |
output_dir = pathlib.Path() | |
for option, argument in command_line[0]: | |
if option in ("-e", "--encrypt"): | |
encrypt_algorithm = argument | |
exec_function = lambda b: EncryptManager().encrypt(int(encrypt_algorithm), b) | |
elif option in ("-d", "--decrypt"): | |
exec_function = EncryptManager().decrypt | |
elif option in ("-o", "--output"): | |
output_file = argument | |
elif option in ("-r", "--replace"): | |
replace = True | |
elif option in ("-f", "--dir"): | |
output_dir = pathlib.Path(argument) | |
elif option in ("-h", "--help"): | |
print_help(argv[0]) | |
return 0 | |
if command_line[1]: | |
input_file = sys.stdin.buffer if command_line[1] == '-' else open(command_line[1][0], "rb") | |
if output_file == '-' and replace: | |
output_file = command_line[1][0] | |
data = input_file.read() | |
try: | |
processed_data = exec_function(data) | |
except UnmatchMagicNumberError: | |
print("File is not encrypted", file=sys.stderr) | |
return 1 | |
if output_file == '-': | |
sys.stdout.buffer.write(processed_data) | |
else: | |
with open(output_dir / output_file, "wb" if replace else "xb") as output_file_obj: | |
output_file_obj.write(processed_data) | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main(len(sys.argv), sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment