Created
June 13, 2019 17:54
-
-
Save aashutoshrathi/f2bad34618e8c3f10bebc10526da4060 to your computer and use it in GitHub Desktop.
WD Passport Tools for Linux
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import os | |
import struct | |
import getpass | |
from hashlib import sha256 | |
from random import randint | |
import argparse | |
import subprocess | |
try: | |
import py_sg | |
except ImportError, e: | |
print "You need to install the \"py_sg\" module." | |
sys.exit(1) | |
BLOCK_SIZE = 512 | |
HANDSTORESECURITYBLOCK = 1 | |
dev = None | |
## Print fail message with red leading characters | |
def fail(str): | |
return "\033[91m" + "[!]" + "\033[0m" + " " + str | |
## Print fail message with green leading characters | |
def success(str): | |
return "\033[92m" + "[*]" + "\033[0m" + " " + str | |
## Print fail message with blue leading characters | |
def question(str): | |
return "\033[94m" + "[+]" + "\033[0m" + " " + str | |
def title(str): | |
return "\033[93m" + str + "\033[0m" | |
### Return if the current user is root | |
def is_root_user(): | |
if os.geteuid() != 0: return False | |
else: return True | |
## Convert an integer to his human-readable secure status | |
def sec_status_to_str(security_status): | |
if security_status == 0x00: | |
return "No lock" | |
elif security_status == 0x01: | |
return "Locked"; | |
elif security_status == 0x02: | |
return "Unlocked"; | |
elif security_status == 0x06: | |
return "Locked, unlock blocked"; | |
elif security_status == 0x07: | |
return "No keys"; | |
else: | |
return "unknown"; | |
## Convert an integer to his human-readable cipher algorithm | |
def cipher_id_to_str(cipher_id): | |
if cipher_id == 0x10: | |
return "AES_128_ECB"; | |
elif cipher_id == 0x12: | |
return "AES_128_CBC"; | |
elif cipher_id == 0x18: | |
return "AES_128_XTS"; | |
elif cipher_id == 0x20: | |
return "AES_256_ECB"; | |
elif cipher_id == 0x22: | |
return "AES_256_CBC"; | |
elif cipher_id == 0x28: | |
return "AES_256_XTS"; | |
elif cipher_id == 0x30: | |
return "Full Disk Encryption"; | |
else: | |
return "unknown"; | |
## Transform "cdb" in char[] | |
def _scsi_pack_cdb(cdb): | |
return struct.pack('{0}B'.format(len(cdb)), *cdb) | |
## Convert int from host byte order to network byte order | |
def htonl(num): | |
return struct.pack('!I', num) | |
## Convert int from host byte order to network byte order | |
def htons(num): | |
return struct.pack('!H', num) | |
## Call the device and get the selected block of Handy Store. | |
def read_handy_store(page): | |
cdb = [0xD8,0x00,0x00,0x00,0x00,0x01,0x00,0x00,0x01,0x00] | |
i = 2 | |
for c in htonl(page): | |
cdb[i] = ord(c) | |
i+=1 | |
data = py_sg.read(dev, _scsi_pack_cdb(cdb), BLOCK_SIZE) | |
return data | |
## Calculate checksum on the returned data | |
def hsb_checksum(data): | |
c = 0 | |
for i in range(510): | |
c = c + ord(data[i]) | |
c = c + ord(data[0]) ## Some WD Utils count data[0] twice, some other not ... | |
r = (c * -1) & 0xFF | |
return hex(r) | |
## Call the device and get the encryption status. | |
## The function returns three values: | |
## | |
## SecurityStatus: | |
## 0x00 => No lock | |
## 0x01 => Locked | |
## 0x02 => Unlocked | |
## 0x06 => Locked, unlock blocked | |
## 0x07 => No keys | |
## CurrentChiperID | |
## 0x10 => AES_128_ECB | |
## 0x12 => AES_128_CBC | |
## 0x18 => AES_128_XTS | |
## 0x20 => AES_256_ECB | |
## 0x22 => AES_256_CBC | |
## 0x28 => AES_256_XTS | |
## 0x30 => Full Disk Encryption | |
## KeyResetEnabler (4 bytes that change every time) | |
## | |
def get_encryption_status(): | |
cdb = [0xC0, 0x45, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x30, 0x00] | |
data = py_sg.read(dev, _scsi_pack_cdb(cdb), BLOCK_SIZE) | |
if ord(data[0]) != 0x45: | |
print fail("Wrong encryption status signature %s" % hex(ord(data[0]))) | |
sys.exit(1) | |
## SecurityStatus, CurrentChiperID, KeyResetEnabler | |
return (ord(data[3]), ord(data[4]), data[8:12]) | |
## Call the device and get the first block of Handy Store. | |
## The function returns three values: | |
## | |
## Iteration - number of iteration (hashing) in password generation | |
## Salt - salt used in password generation | |
## Hint - hint of the password if used. TODO. | |
def read_handy_store_block1(): | |
signature = [0x00, 0x01, 0x44, 0x57] | |
sector_data = read_handy_store(1) | |
## Check if retrieved Checksum is correct | |
if hsb_checksum(sector_data) != hex(ord(sector_data[511])): | |
print fail("Wrong HSB1 checksum") | |
sys.exit(1) | |
## Check if retrieved Signature is correct | |
for i in range(0,4): | |
if signature[i] != ord(sector_data[i]): | |
print fail("Wrong HSB1 signature.") | |
sys.exit(1); | |
iteration = struct.unpack_from("<I",sector_data[8:]) | |
salt = sector_data[12:20] + chr(0x00) + chr(0x00) | |
hint = sector_data[24:226] + chr(0x00) + chr(0x00) | |
return (iteration[0],salt,hint) | |
## Perform password hashing with requirements obtained from the device | |
def mk_password_block(passwd, iteration, salt): | |
clean_salt = "" | |
for i in range(len(salt)/2): | |
if ord(salt[2 * i]) == 0x00 and ord(salt[2 * i + 1]) == 0x00: | |
break | |
clean_salt = clean_salt + salt[2 * i] | |
password = clean_salt + passwd | |
password = password.encode("utf-16")[2:] | |
for i in range(iteration): | |
password = sha256(password).digest() | |
return password | |
## Unlock the device | |
def unlock(): | |
cdb = [0xC1,0xE1,0x00,0x00,0x00,0x00,0x00,0x00,0x28,0x00] | |
sec_status, cipher_id, key_reset = get_encryption_status() | |
## Device should be in the correct state | |
if (sec_status == 0x00 or sec_status == 0x02): | |
print fail("Your device is already unlocked!") | |
return | |
elif (sec_status != 0x01): | |
print fail("Wrong device status!") | |
sys.exit(1) | |
if cipher_id == 0x10 or cipher_id == 0x12 or cipher_id == 0x18: | |
pwblen = 16; | |
elif cipher_id == 0x20 or cipher_id == 0x22 or cipher_id == 0x28: | |
pwblen = 32; | |
elif cipher_id == 0x30: | |
pwblen = 32; | |
else: | |
print fail("Unsupported cipher %s" % cipher_id) | |
sys.exit(1) | |
## Get password from user | |
print question("Insert password to Unlock the device") | |
passwd = getpass.getpass() | |
iteration,salt,hint = read_handy_store_block1() | |
pwd_hashed = mk_password_block(passwd, iteration, salt) | |
pw_block = [0x45,0x00,0x00,0x00,0x00,0x00] | |
for c in htons(pwblen): | |
pw_block.append(ord(c)) | |
pwblen = pwblen + 8 | |
cdb[8] = pwblen | |
try: | |
## If there aren't exceptions the unlock operation is OK. | |
py_sg.write(dev, _scsi_pack_cdb(cdb), _scsi_pack_cdb(pw_block) + pwd_hashed) | |
print success("Device unlocked.") | |
except: | |
## Wrong password or something bad is happened. | |
print fail("Wrong password.") | |
pass | |
## Change device password | |
## If the new password is empty the device state change and become "0x00 - No lock" meaning encryption is no more used. | |
## If the device is unencrypted a user can choose a password and make the whole device encrypted. | |
## | |
## DEVICE HAS TO BE UNLOCKED TO PERFORM THIS OPERATION | |
## | |
def change_password(): | |
cdb = [0xC1, 0xE2, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x48, 0x00] | |
sec_status, cipher_id, key_reset = get_encryption_status() | |
if (sec_status != 0x02 and sec_status != 0x00): | |
print fail("Device has to be unlocked or without encryption to perform this operation") | |
sys.exit(1) | |
if cipher_id == 0x10 or cipher_id == 0x12 or cipher_id == 0x18: | |
pwblen = 16; | |
elif cipher_id == 0x20 or cipher_id == 0x22 or cipher_id == 0x28: | |
pwblen = 32; | |
elif cipher_id == 0x30: | |
pwblen = 32; | |
else: | |
print fail("Unsupported cipher %s" % cipher_id) | |
sys.exit(1) | |
print question("Insert the OLD password") | |
old_passwd = getpass.getpass() | |
print question("Insert the NEW password") | |
new_passwd = getpass.getpass() | |
print question("Confirm the NEW password") | |
new_passwd2 = getpass.getpass() | |
if new_passwd != new_passwd2: | |
print fail("Password confirmation doesn't match the given password") | |
sys.exit(1) | |
## Both passwords shouldn't be empty | |
if (len(old_passwd) <= 0 and len(new_passwd) <= 0): | |
print fail("Both passwords shouldn't be empty") | |
sys.exit(1) | |
iteration,salt,hint = read_handy_store_block1() | |
pw_block = [0x45,0x00,0x00,0x00,0x00,0x00] | |
for c in htons(pwblen): | |
pw_block.append(ord(c)) | |
if (len(old_passwd) > 0): | |
old_passwd_hashed = mk_password_block(old_passwd, iteration, salt) | |
pw_block[3] = pw_block[3] | 0x10 | |
else: | |
old_passwd_hashed = "" | |
for i in range(32): | |
old_passwd_hashed = old_passwd_hashed + chr(0x00) | |
if (len(new_passwd) > 0): | |
new_passwd_hashed = mk_password_block(new_passwd, iteration, salt) | |
pw_block[3] = pw_block[3] | 0x01 | |
else: | |
new_passwd_hashed = "" | |
for i in range(32): | |
new_passwd_hashed = new_passwd_hashed + chr(0x00) | |
if pw_block[3] & 0x11 == 0x11: | |
pw_block[3] = pw_block[3] & 0xEE | |
pwblen = 8 + 2 * pwblen | |
cdb[8] = pwblen | |
try: | |
## If exception isn't raised the unlock operation gone ok. | |
py_sg.write(dev, _scsi_pack_cdb(cdb), _scsi_pack_cdb(pw_block) + old_passwd_hashed + new_passwd_hashed) | |
print success("Password changed.") | |
except: | |
## Wrong password or something bad is happened. | |
print fail("Error changing password") | |
pass | |
## Change the internal key used for encryption, every data on the device would be permanently unaccessible. | |
## Device forgets even the partition table so you have to make a new one. | |
def secure_erase(cipher_id = 0): | |
cdb = [0xC1, 0xE3, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00] | |
status, current_cipher_id, key_reset = get_encryption_status() | |
if cipher_id == 0: | |
cipher_id = current_cipher_id | |
pw_block = [0x45,0x00,0x00,0x00,0x30,0x00,0x00,0x00] | |
if cipher_id == 0x10 or cipher_id == 0x12 or cipher_id == 0x18: | |
pwblen = 16; | |
pw_block[3] = 0x01 | |
elif cipher_id == 0x20 or cipher_id == 0x22 or cipher_id == 0x28: | |
pwblen = 32; | |
pw_block[3] = 0x01 | |
elif cipher_id == 0x30: | |
pwblen = 32; | |
# pw_block[3] = 0x00 | |
else: | |
print fail("Unsupported cipher %s" % cipher_id) | |
sys.exit(1) | |
## Set the actual lenght of pw_block (8 bytes + pwblen pseudorandom data) | |
cdb[8] = pwblen + 8 | |
## Fill pw_block with random data | |
for rand_byte in os.urandom(pwblen): | |
pw_block.append(ord(rand_byte)) | |
## key_reset needs to be retrieved immidiatly before the reset request | |
#status, current_cipher_id, key_reset = get_encryption_status() | |
key_reset = get_encryption_status()[2] | |
i = 2 | |
for c in key_reset: | |
cdb[i] = ord(c) | |
i += 1 | |
try: | |
py_sg.write(dev, _scsi_pack_cdb(cdb), _scsi_pack_cdb(pw_block)) | |
print success("Device erased. You need to create a new partition on the device (Hint: fdisk and mkfs)") | |
except: | |
## Something bad is happened. | |
print fail("Something wrong.") | |
pass | |
## Get device info through "lsscsi" command | |
def get_device_info(device = None): | |
if device == None: grep_string = "Passport" | |
else: grep_string = device | |
## Ex. from the following string | |
## "[23:0:0:0] disk WD My Passport 0820 1012 /dev/sdb" | |
## We extract | |
p = subprocess.Popen("lsscsi | grep " + grep_string + " | grep -oP \"\/([a-zA-Z]+)\/([a-zA-Z0-9]+)\"",shell=True,stdout=subprocess.PIPE) | |
## /dev/sdb | |
complete_path = p.stdout.read().rstrip() | |
p = subprocess.Popen("lsscsi | grep " + grep_string + " | grep -oP \"\/([a-zA-Z]+)\/([a-zA-Z0-9]+)\" | cut -d '/' -f 3",shell=True,stdout=subprocess.PIPE) | |
## sdb | |
relative_path = p.stdout.read().rstrip() | |
p = subprocess.Popen("lsscsi -d|grep " + grep_string + "|cut -d ':' -f 1|cut -d '[' -f 2",shell=True,stdout=subprocess.PIPE) | |
## 23 | |
host_number = p.stdout.read().rstrip() | |
return (complete_path, relative_path, host_number) | |
## Enable mount operations | |
## Tells the system to scan the "new" (unlocked) device | |
def enable_mount(device): | |
sec_status, cipher_id, key_reset = get_encryption_status() | |
## Device should be in the correct state | |
if (sec_status == 0x00 or sec_status == 0x02): | |
rp,hn = get_device_info(device)[1:] | |
p = subprocess.Popen("echo 1 > /sys/block/" + rp + "/device/delete",shell=True) | |
p = subprocess.Popen("echo \"- - -\" > /sys/class/scsi_host/host" + hn + "/scan",shell=True) | |
print success("Now depending on your system you can mount your device or it will be automagically mounted.") | |
else: | |
print fail("Device needs to be unlocked in order to mount it.") | |
## Main function, get parameters and manage operations | |
def main(argv): | |
global dev | |
print title("WD Passport Ultra linux utility v0.1 by duke") | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-s", "--status", required=False, action="store_true", help="Check device status and encryption type") | |
parser.add_argument("-u", "--unlock", required=False, action="store_true", help="Unlock") | |
parser.add_argument("-m", "--mount", required=False, action="store_true", help="Enable mount point for an unlocked device") | |
parser.add_argument("-c", "--change_passwd", required=False, action="store_true", help="Change (or disable) password") | |
parser.add_argument("-e", "--erase", required=False, action="store_true", help="Secure erase device") | |
parser.add_argument("-d", "--device", dest="device", required=False, help="Force device path (ex. /dev/sdb). Usually you don't need this option.") | |
args = parser.parse_args() | |
if not is_root_user(): | |
print fail("You need to have root privileges to run this script.") | |
sys.exit(1) | |
if len(sys.argv) == 1: | |
args.status = True | |
if args.device: | |
DEVICE = args.device | |
else: | |
## Get occurrences of "Passport" devices | |
p = subprocess.Popen("lsscsi | grep Passport | wc -l",shell=True,stdout=subprocess.PIPE) | |
if int(p.stdout.read().rstrip()) > 1: | |
print fail("Multiple occurences of \"My Passport\" detected. You should specify a device manually (with -d option).") | |
sys.exit(1) | |
DEVICE = get_device_info()[0] | |
try: | |
dev = open(DEVICE,"r+b") | |
except: | |
print fail("Something wrong opening device \"%s\"" % (DEVICE)) | |
sys.exit(1) | |
if args.status: | |
status, cipher_id, key_reset = get_encryption_status() | |
print success("Device state") | |
print "\tSecurity status: %s" % sec_status_to_str(status) | |
print "\tEncryption type: %s" % cipher_id_to_str(cipher_id) | |
if args.unlock: | |
unlock() | |
if args.change_passwd: | |
change_password() | |
if args.erase: | |
print question("Any data on the device will be lost. Are you sure you want to continue? [y/N]") | |
r = sys.stdin.read(1) | |
if r.lower() == 'y': | |
secure_erase(0) | |
else: | |
print success("Ok. Bye.") | |
if args.mount: | |
enable_mount(DEVICE) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment