Skip to content

Instantly share code, notes, and snippets.

@zhongfu
Created June 13, 2025 14:41
Show Gist options
  • Save zhongfu/d2e38e0e43e3beab44a4d41c770c0425 to your computer and use it in GitHub Desktop.
Save zhongfu/d2e38e0e43e3beab44a4d41c770c0425 to your computer and use it in GitHub Desktop.
hikvision R0-series cn->en patching bits
#!/usr/bin/env python
"""
Unbrick a Hikvision device. See README.md for usage information.
"""
from __future__ import division
__author__ = 'Scott Lamb'
__license__ = 'MIT'
__email__ = '[email protected]'
import argparse
import errno
import os
import select
import socket
import struct
import sys
import time
HANDSHAKE_BYTES = struct.pack('20s', b'SWKH')
_HANDSHAKE_SERVER_PORT = 9978
_TFTP_SERVER_PORT = 69
_TIME_FMT = '%c'
_DEFAULT_BLOCK_SIZE = 512
class Error(Exception): pass
class Server(object):
# See https://tools.ietf.org/html/rfc1350
_TFTP_OPCODE_RRQ = 1
_TFTP_OPCODE_DATA = 3
_TFTP_OPCODE_ACK = 4
_TFTP_OPCODE_OACK = 6
_TFTP_ACK_PREFIX = struct.pack('>h', _TFTP_OPCODE_ACK)
def __init__(self, handshake_addr, tftp_addr, filename, file_contents):
self._file_contents = file_contents
self._filename = filename
self._tftp_rrq_prefix = (struct.pack('>h', self._TFTP_OPCODE_RRQ) +
filename.encode('ascii') + b'\x00')
self._tftp_blksize_option = b'blksize\x00'
self._handshake_sock = self._bind(handshake_addr)
self._tftp_sock = self._bind(tftp_addr)
self._set_block_size(_DEFAULT_BLOCK_SIZE)
def _bind(self, addr):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.bind(addr)
except socket.error as e:
if e.errno == errno.EADDRNOTAVAIL:
raise Error(
('Address %s:%d not available.\n\n'
'Try running:\n'
'linux$ sudo ifconfig eth0:0 %s\n'
'osx$ sudo ifconfig en0 alias %s '
'255.255.255.0\n\n'
'(adjust eth0 or en0 to taste. see "ifconfig -a" output)')
% (addr[0], addr[1], addr[0], addr[0]))
if e.errno == errno.EADDRINUSE:
raise Error(
('Address %s:%d in use.\n'
'Make sure no other TFTP server is running.') % addr)
if e.errno == errno.EACCES:
raise Error(('No permission to bind to %s:%d.\n'
'Try running with sudo.') % addr)
raise
return sock
def _set_block_size(self, block_size):
# TODO: Don't mutate overall server for a single transfer. Use some kind of per-transfer state
print('Setting block size to %d' % block_size)
self._block_size = block_size
self._total_blocks = ((len(self._file_contents) + self._block_size)
// self._block_size)
print('Serving %d-byte %s (block size %d, %d blocks)' % (
len(self._file_contents), self._filename, self._block_size, self._total_blocks))
def _check_total_block_limit(self):
if self._total_blocks > 65535:
raise Error('File is too big to serve with %d-byte blocks.'
% self._block_size)
def _parse_options(self, pkt):
pkt_options = pkt.split(self._tftp_rrq_prefix)[1]
options_list = pkt_options.split(b'\x00')[1:]
options = {}
for i in range(0, len(options_list) - 1, 2):
options[options_list[i]] = options_list[i + 1]
print('read request options: %s' % options)
return options
def close(self):
self._handshake_sock.close()
self._tftp_sock.close()
def run_forever(self):
while True:
self._iterate()
def _iterate(self):
r, _, _ = select.select(
[self._handshake_sock, self._tftp_sock], [], [])
if self._handshake_sock in r:
self._handshake_read()
if self._tftp_sock in r:
self._tftp_read()
def _handshake_read(self):
pkt, addr = self._handshake_sock.recvfrom(len(HANDSHAKE_BYTES))
now = time.strftime(_TIME_FMT)
if pkt == HANDSHAKE_BYTES:
self._handshake_sock.sendto(pkt, addr)
print('%s: Replied to magic handshake request.' % now)
else:
print('%s: received unexpected handshake bytes %r from %s:%d' % (
now, pkt.encode('hex'), addr[0], addr[1]))
def _tftp_read(self):
pkt, addr = self._tftp_sock.recvfrom(65536)
now = time.strftime(_TIME_FMT)
if pkt.startswith(self._tftp_rrq_prefix):
options = self._parse_options(pkt)
if 'blksize' in options:
self._set_block_size(int(options['blksize']))
print('%s: sending options ack' % now)
self._tftp_options_ack(addr)
return
self._check_total_block_limit()
print('%s: starting transfer' % now)
self._tftp_maybe_send(0, addr)
elif pkt.startswith(self._TFTP_ACK_PREFIX):
(block,) = struct.unpack(
'>H', pkt[len(self._TFTP_ACK_PREFIX):])
self._tftp_maybe_send(block, addr)
else:
print('%s: received unexpected tftp bytes %r from %s:%d' % (
now, pkt.encode('hex'), addr[0], addr[1]))
def _tftp_options_ack(self, addr):
self._check_total_block_limit()
pkt = (struct.pack('>H', self._TFTP_OPCODE_OACK) + b'blksize\x00' + str(self._block_size) + b'\x00')
self._tftp_sock.sendto(pkt, addr)
def _tftp_maybe_send(self, prev_block, addr):
block = prev_block + 1
start_byte = prev_block * self._block_size
if start_byte > len(self._file_contents):
print('%s: done!' % time.strftime(_TIME_FMT))
if self._block_size != _DEFAULT_BLOCK_SIZE:
self._set_block_size(_DEFAULT_BLOCK_SIZE)
return
block_data = self._file_contents[start_byte:start_byte+self._block_size]
pkt = (struct.pack('>hH', self._TFTP_OPCODE_DATA, block) + block_data)
self._tftp_sock.sendto(pkt, addr)
_progress_width = 53
print('%s: %5d / %5d [%-*s]' % (
time.strftime(_TIME_FMT), block, self._total_blocks,
_progress_width,
'#' * (_progress_width * block // self._total_blocks)))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--filename', default='digicap.dav',
help='file to serve; used both to read from the local '
'disk and for the filename to expect from client')
parser.add_argument('--server-ip', default='192.0.0.128',
help='IP address to serve from.')
args = parser.parse_args()
try:
file_contents = open(args.filename, mode='rb').read()
except IOError as e:
print('Error: can\'t read %s' % args.filename)
if e.errno == errno.ENOENT:
print('Please download/move it to the current working directory.')
sys.exit(1)
raise
try:
server = Server((args.server_ip, _HANDSHAKE_SERVER_PORT),
(args.server_ip, _TFTP_SERVER_PORT),
args.filename, file_contents)
except Error as e:
print('Error: %s' % str(e))
sys.exit(1)
server.run_forever()
#!/usr/bin/env python3
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
import sys
DEVICE_TYPES = {
"DS-2CD2032F-I": 38950,
"DS-2CD2032F-IW": 38950,
"DS-2CD2032-I": 38917,
"DS-2CD2132-I": 38918,
"DS-2CD2132F-IS": 38942,
"DS-2CD2132F-IWS": 38942,
"DS-2CD2232-I5": 38919,
"DS-2CD2332-I": 38920,
"DS-2CD2432F-IW": 38930,
"DS-2CD2532F-IS": 38932,
"DS-2CD2632F-IS": 38928,
"DS-2CD2732F-IS": 38926,
"DS-2CD3132-I": 38918,
"DS-2DE2103-DE3/W": 8988,
"DS-2CD2T32-I8": 38945,
}
def fix(mtd6: bytes):
mtd6 = bytearray(mtd6)
chunk = mtd6[0x9:0x9+0xf4]
assert len(chunk) == 0xf4, "mtd6 contents may be malformed - filesize less than (0x9 + 0xf4) = 253 bytes"
model = chunk[119:149].strip(b'\x00').decode('ascii')
current_lang = chunk[0x7]
current_devtype_bytes = chunk[0x5b:0x5d]
current_devtype = int.from_bytes(current_devtype_bytes, 'little')
logger.info(f"Model {model}, current language {current_lang}, current devtype {current_devtype} (0x{current_devtype_bytes.hex()})")
assert model in DEVICE_TYPES, f"Unknown model '{model}', we don't have a devType value for it"
device_type = DEVICE_TYPES[model]
device_type_bytes = device_type.to_bytes(2, 'little')
logger.info(f"Changing language to 0x1")
chunk[0x7:0x8] = b'\x01'
logger.info(f"Changing devtype value to {device_type} (0x{device_type_bytes.hex()})")
chunk[0x5b:0x5d] = device_type_bytes
# "checksum-16"
checksum = sum(chunk) & 0xffff
checksum_bytes = checksum.to_bytes(2, 'little')
logger.info(f"Calculated checksum: {checksum_bytes.hex()}")
# replace chunk
mtd6[0x9:0x9+0xf4] = chunk
# replace checksum
mtd6[0x4:0x6] = checksum_bytes
return bytes(mtd6)
def fix_file(fn: str):
with open(fn, 'rb') as f:
mtd6 = f.read()
mtd6_mod = fix(mtd6)
out_fn = f"{fn[:-5]}_mod" if fn[-5:] == "_orig" else f"{fn}_mod"
with open(out_fn, 'wb') as f:
f.write(mtd6_mod)
if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <mtd6_file>", file=sys.stderr)
print("Will patch and write output to <mtd6_file>_mod if <mtd6_file> doesn't end with '_orig', else replaces '_orig' with '_mod'", file=sys.stderr)
sys.exit(1)
fix_file(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment