Created
October 9, 2020 03:41
-
-
Save Dliv3/fd6912d3ac6765b87f288200d5151074 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#################### | |
# | |
# Copyright (c) 2019 Dirk-jan Mollema / Fox-IT (@_dirkjan) | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
# | |
# Checks for CVE-2019-1040 vulnerability over SMB. | |
# The script will establish a connection to the target host(s) and send | |
# an invalid NTLM authentication. If this is accepted, the host is vulnerable to | |
# CVE-2019-1040 and you can execute the MIC Remove attack with ntlmrelayx. | |
# | |
# See https://dirkjanm.io/exploiting-CVE-2019-1040-relay-vulnerabilities-for-rce-and-domain-admin/ | |
# for more info. | |
# | |
# Author: | |
# Dirk-jan Mollema (@_dirkjan) | |
# dlive (@D1iv3) | |
# | |
#################### | |
import sys | |
import logging | |
import argparse | |
import codecs | |
import calendar | |
import struct | |
import time | |
import random | |
import string | |
from impacket import version | |
from impacket.examples.logger import ImpacketFormatter | |
from impacket.smbconnection import SMBConnection, SessionError | |
from impacket.smb3structs import * | |
from impacket import ntlm | |
from impacket.ntlm import AV_PAIRS, NTLMSSP_AV_TIME, NTLMSSP_AV_FLAGS, NTOWFv2, NTLMSSP_AV_TARGET_NAME, NTLMSSP_AV_HOSTNAME,USE_NTLMv2, hmac_md5 | |
def parse_creds(target): | |
creds, remote_name = target.rsplit('@', 1) | |
if ':' in creds: | |
colon_split = creds.split(':', 1) # dom/user, pass | |
password = colon_split[1] | |
creds = colon_split[0] | |
else: | |
password = '' | |
if '/' in creds: | |
slash_split = creds.split("/", 1) | |
dom = slash_split[0].strip() | |
user = slash_split[1].strip() | |
else: | |
dom = '' | |
user = creds | |
return dom, user, password, remote_name | |
def mod_getNTLMSSPType3(type1, type2, user, password, domain, lmhash = '', nthash = '', use_ntlmv2 = USE_NTLMv2): | |
# Safety check in case somebody sent password = None.. That's not allowed. Setting it to '' and hope for the best. | |
if password is None: | |
password = '' | |
# Let's do some encoding checks before moving on. Kind of dirty, but found effective when dealing with | |
# international characters. | |
import sys | |
encoding = sys.getfilesystemencoding() | |
if encoding is not None: | |
try: | |
user.encode('utf-16le') | |
except: | |
user = user.decode(encoding) | |
try: | |
password.encode('utf-16le') | |
except: | |
password = password.decode(encoding) | |
try: | |
domain.encode('utf-16le') | |
except: | |
domain = user.decode(encoding) | |
ntlmChallenge = ntlm.NTLMAuthChallenge(type2) | |
# Let's start with the original flags sent in the type1 message | |
responseFlags = type1['flags'] | |
# Token received and parsed. Depending on the authentication | |
# method we will create a valid ChallengeResponse | |
ntlmChallengeResponse = ntlm.NTLMAuthChallengeResponse(user, password, ntlmChallenge['challenge']) | |
clientChallenge = ntlm.b("".join([random.choice(string.digits+string.ascii_letters) for _ in range(8)])) | |
serverName = ntlmChallenge['TargetInfoFields'] | |
ntResponse, lmResponse, sessionBaseKey = ntlm.computeResponse(ntlmChallenge['flags'], ntlmChallenge['challenge'], | |
clientChallenge, serverName, domain, user, password, | |
lmhash, nthash, use_ntlmv2) | |
# Let's check the return flags | |
if (ntlmChallenge['flags'] & ntlm.NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY) == 0: | |
# No extended session security, taking it out | |
responseFlags &= 0xffffffff ^ ntlm.NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY | |
if (ntlmChallenge['flags'] & ntlm.NTLMSSP_NEGOTIATE_128 ) == 0: | |
# No support for 128 key len, taking it out | |
responseFlags &= 0xffffffff ^ ntlm.NTLMSSP_NEGOTIATE_128 | |
if (ntlmChallenge['flags'] & ntlm.NTLMSSP_NEGOTIATE_KEY_EXCH) == 0: | |
# No key exchange supported, taking it out | |
responseFlags &= 0xffffffff ^ ntlm.NTLMSSP_NEGOTIATE_KEY_EXCH | |
# drop the mic need to unset these flags | |
if (ntlmChallenge['flags'] & ntlm.NTLMSSP_NEGOTIATE_SEAL) != 0: | |
responseFlags ^= ntlm.NTLMSSP_NEGOTIATE_SEAL | |
if (ntlmChallenge['flags'] & ntlm.NTLMSSP_NEGOTIATE_SIGN) != 0: | |
responseFlags ^= ntlm.NTLMSSP_NEGOTIATE_SIGN | |
if (ntlmChallenge['flags'] & ntlm.NTLMSSP_NEGOTIATE_ALWAYS_SIGN) != 0: | |
responseFlags ^= ntlm.NTLMSSP_NEGOTIATE_ALWAYS_SIGN | |
keyExchangeKey = ntlm.KXKEY(ntlmChallenge['flags'], sessionBaseKey, lmResponse, ntlmChallenge['challenge'], password, | |
lmhash, nthash, use_ntlmv2) | |
# Special case for anonymous login | |
if user == '' and password == '' and lmhash == '' and nthash == '': | |
keyExchangeKey = b'\x00'*16 | |
if ntlmChallenge['flags'] & ntlm.NTLMSSP_NEGOTIATE_KEY_EXCH: | |
exportedSessionKey = ntlm.b("".join([random.choice(string.digits+string.ascii_letters) for _ in range(16)])) | |
encryptedRandomSessionKey = ntlm.generateEncryptedSessionKey(keyExchangeKey, exportedSessionKey) | |
else: | |
encryptedRandomSessionKey = None | |
exportedSessionKey = keyExchangeKey | |
ntlmChallengeResponse['flags'] = responseFlags | |
ntlmChallengeResponse['domain_name'] = domain.encode('utf-16le') | |
ntlmChallengeResponse['host_name'] = type1.getWorkstation().encode('utf-16le') | |
if lmResponse == '': | |
ntlmChallengeResponse['lanman'] = b'\x00' | |
else: | |
ntlmChallengeResponse['lanman'] = lmResponse | |
ntlmChallengeResponse['ntlm'] = ntResponse | |
if encryptedRandomSessionKey is not None: | |
ntlmChallengeResponse['session_key'] = encryptedRandomSessionKey | |
return ntlmChallengeResponse, exportedSessionKey | |
# Slightly modified version of impackets computeResponseNTLMv2 | |
def mod_computeResponseNTLMv2(flags, serverChallenge, clientChallenge, serverName, domain, user, password, lmhash='', | |
nthash='', use_ntlmv2=USE_NTLMv2, check=False): | |
responseServerVersion = b'\x01' | |
hiResponseServerVersion = b'\x01' | |
responseKeyNT = NTOWFv2(user, password, domain, nthash) | |
av_pairs = AV_PAIRS(serverName) | |
av_pairs[NTLMSSP_AV_TARGET_NAME] = 'cifs/'.encode('utf-16le') + av_pairs[NTLMSSP_AV_HOSTNAME][1] | |
if av_pairs[NTLMSSP_AV_TIME] is not None: | |
aTime = av_pairs[NTLMSSP_AV_TIME][1] | |
else: | |
aTime = struct.pack('<q', (116444736000000000 + calendar.timegm(time.gmtime()) * 10000000)) | |
av_pairs[NTLMSSP_AV_TIME] = aTime | |
av_pairs[NTLMSSP_AV_FLAGS] = b'\x02' + b'\x00' * 3 | |
serverName = av_pairs.getData() | |
temp = responseServerVersion + hiResponseServerVersion + b'\x00' * 6 + aTime + clientChallenge + b'\x00' * 4 + \ | |
serverName + b'\x00' * 4 | |
ntProofStr = hmac_md5(responseKeyNT, serverChallenge + temp) | |
ntChallengeResponse = ntProofStr + temp | |
lmChallengeResponse = hmac_md5(responseKeyNT, serverChallenge + clientChallenge) + clientChallenge | |
sessionBaseKey = hmac_md5(responseKeyNT, ntProofStr) | |
return ntChallengeResponse, lmChallengeResponse, sessionBaseKey | |
orig_type1 = ntlm.getNTLMSSPType1 | |
# Wrapper to remove signing flags | |
def mod_getNTLMSSPType1(workstation='', domain='', signingRequired = False, use_ntlmv2 = USE_NTLMv2): | |
return orig_type1(workstation, domain, False, use_ntlmv2) | |
class checker(object): | |
def __init__(self, username='', password='', domain='', port=None, | |
hashes=None): | |
self.__username = username | |
self.__password = password | |
self.__port = port | |
self.__domain = domain | |
self.__lmhash = '' | |
self.__nthash = '' | |
if hashes is not None: | |
self.__lmhash, self.__nthash = hashes.split(':') | |
self.creds_validated = False | |
def validate_creds(self, remote_host): | |
try: | |
smbClient = SMBConnection(remote_host, remote_host, sess_port=int(self.__port)) #, preferredDialect=SMB2_DIALECT_21 | |
smbClient.login(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash) | |
except SessionError as exc: | |
if 'STATUS_LOGON_FAILURE' in str(exc): | |
logging.error('Error validating credentials - make sure the supplied credentials are correct') | |
else: | |
logging.warning('Unexpected Exception while validating credentials against {}: %s'.format(remote_host), exc) | |
raise KeyboardInterrupt | |
except: | |
logging.error('Error during connection to {}. TCP/445 refused, timeout?'.format(remote_host)) | |
def check(self, remote_host): | |
# Validate credentials first | |
if not self.creds_validated: | |
self.validate_creds(remote_host) | |
self.creds_validated = True | |
# Now start scanner | |
try: | |
smbClient = SMBConnection(remote_host, remote_host, sess_port=int(self.__port)) #, preferredDialect=SMB2_DIALECT_21 | |
except: | |
return | |
ntlm.computeResponseNTLMv2 = mod_computeResponseNTLMv2 | |
ntlm.getNTLMSSPType3 = mod_getNTLMSSPType3 | |
try: | |
smbClient.login(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash) | |
logging.info('Target %s is VULNERABLE to CVE-2019-1040 (authentication was accepted)', remote_host) | |
except SessionError as exc: | |
if 'STATUS_INVALID_PARAMETER' in str(exc): | |
logging.info('Target %s is not vulnerable to CVE-2019-1040 (authentication was rejected)', remote_host) | |
else: | |
logging.warning('Unexpected Exception while authenticating to %s: %s', remote_host, exc) | |
smbClient.close() | |
# Process command-line arguments. | |
def main(): | |
# Init the example's logger theme | |
handler = logging.StreamHandler(sys.stderr) | |
handler.setFormatter(ImpacketFormatter()) | |
logging.getLogger().addHandler(handler) | |
logging.getLogger().setLevel(logging.INFO) | |
# Explicitly changing the stdout encoding format | |
if sys.stdout.encoding is None: | |
# Output is redirected to a file | |
sys.stdout = codecs.getwriter('utf8')(sys.stdout) | |
logging.info('CVE-2019-1040 scanner by @_dirkjan / Fox-IT - Based on impacket by SecureAuth') | |
parser = argparse.ArgumentParser(description="CVE-2019-1040 scanner - Connects over SMB and attempts to authenticate " | |
"with invalid NTLM packets. If accepted, target is vulnerable to MIC remove attack") | |
parser.add_argument('target', action='store', help='[[domain/]username[:password]@]<targetName or address>') | |
group = parser.add_argument_group('connection') | |
group.add_argument('-target-file', | |
action='store', | |
metavar="file", | |
help='Use the targets in the specified file instead of the one on'\ | |
' the command line (you must still specify something as target name)') | |
group.add_argument('-port', choices=['139', '445'], nargs='?', default='445', metavar="destination port", | |
help='Destination port to connect to SMB Server') | |
group = parser.add_argument_group('authentication') | |
group.add_argument('-hashes', action="store", metavar = "LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH') | |
if len(sys.argv)==1: | |
parser.print_help() | |
sys.exit(1) | |
options = parser.parse_args() | |
domain, username, password, remote_name = parse_creds(options.target) | |
if password == '' and username == '': | |
logging.error("Please supply a username/password (you can't use this scanner with anonymous authentication)") | |
return | |
if password == '' and username != '' and options.hashes is None: | |
from getpass import getpass | |
password = getpass("Password:") | |
remote_names = [] | |
if options.target_file is not None: | |
with open(options.target_file, 'r') as inf: | |
for line in inf: | |
remote_names.append(line.strip()) | |
else: | |
remote_names.append(remote_name) | |
lookup = checker(username, password, domain, int(options.port), options.hashes) | |
for remote_name in remote_names: | |
try: | |
lookup.check(remote_name) | |
except KeyboardInterrupt: | |
break | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment