Created
March 1, 2016 15:15
-
-
Save ferd/f6f556b4f0e4658965e8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
sys.path.append("./enum/enum") | |
from enum import Enum | |
import time | |
import datetime | |
import socket | |
import Crypto.Cipher | |
import signal | |
from binascii import hexlify | |
import base64 | |
sys.path.append("./scapy-ssl_tls/") | |
import logging | |
logging.getLogger("scapy.runtime").setLevel(logging.ERROR) | |
import scapy | |
from scapy.all import * | |
from ssl_tls import * | |
import ssl_tls_crypto | |
from struct import Struct | |
from pyx509.pkcs7.asn1_models.X509_certificate import Certificate | |
from pyx509.pkcs7_models import X509Certificate, PublicKeyInfo, ExtendedKeyUsageExt | |
from pyx509.pkcs7.asn1_models.decoder_workarounds import decode | |
import select | |
def pack_funcs(fmt): | |
struc = Struct('!' + fmt) | |
return struc.pack, struc.unpack_from | |
i_pack, i_unpack = pack_funcs('i') | |
h_pack, h_unpack = pack_funcs('h') | |
q_pack, q_unpack = pack_funcs('q') | |
d_pack, d_unpack = pack_funcs('d') | |
f_pack, f_unpack = pack_funcs('f') | |
iii_pack, iii_unpack = pack_funcs('iii') | |
ii_pack, ii_unpack = pack_funcs('ii') | |
qii_pack, qii_unpack = pack_funcs('qii') | |
dii_pack, dii_unpack = pack_funcs('dii') | |
ihihih_pack, ihihih_unpack = pack_funcs('ihihih') | |
ci_pack, ci_unpack = pack_funcs('ci') | |
bh_pack, bh_unpack = pack_funcs('bh') | |
cccc_pack, cccc_unpack = pack_funcs('cccc') | |
SOCKET_TIMEOUT = 15 | |
SOCKET_RECV_SIZE = 80 * 1024 | |
CON_FAIL = "con fail" | |
NO_STARTTLS = "no starttls" | |
NO_TLS = "no tls" | |
VULN = "vuln" | |
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None): | |
import signal | |
class TimeoutError(Exception): | |
pass | |
def handler(signum, frame): | |
raise TimeoutError() | |
# set the timeout handler | |
signal.signal(signal.SIGALRM, handler) | |
signal.alarm(timeout_duration) | |
try: | |
result = func(*args, **kwargs) | |
except TimeoutError as exc: | |
result = default | |
finally: | |
signal.alarm(0) | |
return result | |
CHALLENGE = 'a' * 16 | |
CLEAR_KEY = '\0' * 11 | |
KEY_ARGUMENT = '\0' * 8 | |
class CipherSuite(object): | |
@classmethod | |
def get_string_description(cls): | |
raise NotImplementedError() | |
@classmethod | |
def get_constant(cls): | |
return eval("SSLv2CipherSuite." + cls.get_string_description()) | |
@classmethod | |
def get_client_master_key(cls, encrypted_pms): | |
raise NotImplementedError() | |
@classmethod | |
def verify_key(cls, connection_id, server_finished): | |
raise NotImplementedError() | |
@classmethod | |
def get_encrypted_pms(cls, public_key, secret_key): | |
pkcs1_pubkey = Crypto.Cipher.PKCS1_v1_5.new(public_key) | |
encrypted_pms = pkcs1_pubkey.encrypt(secret_key) | |
return encrypted_pms | |
class RC4Export(CipherSuite): | |
SECRET_KEY = 'b' * 5 | |
@classmethod | |
def get_string_description(cls): | |
return "RC4_128_EXPORT40_WITH_MD5" | |
@classmethod | |
def get_client_master_key(cls, public_key): | |
client_master_key = SSLv2ClientMasterKey(cipher_suite=cls.get_constant(), | |
encrypted_key=cls.get_encrypted_pms(public_key, cls.SECRET_KEY), | |
clear_key=CLEAR_KEY) | |
return client_master_key | |
@classmethod | |
def verify_key(cls, connection_id, server_finished): | |
md5 = MD5.new(CLEAR_KEY + cls.SECRET_KEY + '0' + CHALLENGE + connection_id).digest() | |
rc4 = Crypto.Cipher.ARC4.new(md5) | |
if not rc4.decrypt(server_finished[2:]).endswith(CHALLENGE): | |
return False | |
return True | |
class RC4(CipherSuite): | |
SECRET_KEY = 'b' * 16 | |
CLEAR_KEY = 'a' * 15 | |
@classmethod | |
def get_string_description(cls): | |
return "RC4_128_WITH_MD5" | |
@classmethod | |
def get_client_master_key(cls, public_key): | |
client_master_key = SSLv2ClientMasterKey(cipher_suite=cls.get_constant(), | |
encrypted_key=cls.get_encrypted_pms(public_key, cls.SECRET_KEY), | |
clear_key=cls.CLEAR_KEY) | |
return client_master_key | |
@classmethod | |
def verify_key(cls, connection_id, server_finished): | |
md5 = MD5.new((cls.CLEAR_KEY + cls.SECRET_KEY)[:16] + '0' + CHALLENGE + connection_id).digest() | |
rc4 = Crypto.Cipher.ARC4.new(md5) | |
if not rc4.decrypt(server_finished[2:]).endswith(CHALLENGE): | |
return False | |
return True | |
class RC2Export(CipherSuite): | |
SECRET_KEY = 'b' * 5 | |
@classmethod | |
def get_string_description(cls): | |
return "RC2_128_CBC_EXPORT40_WITH_MD5" | |
@classmethod | |
def get_client_master_key(cls, public_key): | |
client_master_key = SSLv2ClientMasterKey(cipher_suite=cls.get_constant(), | |
encrypted_key=cls.get_encrypted_pms(public_key, cls.SECRET_KEY), | |
key_argument=KEY_ARGUMENT, | |
clear_key=CLEAR_KEY) | |
return client_master_key | |
@classmethod | |
def verify_key(cls, connection_id, server_finished): | |
md5 = MD5.new(CLEAR_KEY + cls.SECRET_KEY + '0' + CHALLENGE + connection_id).digest() | |
rc2 = Crypto.Cipher.ARC2.new(md5, mode=Crypto.Cipher.ARC2.MODE_CBC, IV=KEY_ARGUMENT, effective_keylen=128) | |
try: | |
decryption = rc2.decrypt(server_finished[3:]) | |
except ValueError, e: | |
return False | |
if decryption[17:].startswith(CHALLENGE): | |
return True | |
return False | |
class DES(CipherSuite): | |
SECRET_KEY = 'b' * 8 | |
@classmethod | |
def get_string_description(cls): | |
return "DES_64_CBC_WITH_MD5" | |
@classmethod | |
def get_client_master_key(cls, public_key): | |
client_master_key = SSLv2ClientMasterKey(cipher_suite=cls.get_constant(), | |
encrypted_key=cls.get_encrypted_pms(public_key, cls.SECRET_KEY), | |
key_argument=KEY_ARGUMENT) | |
return client_master_key | |
@classmethod | |
def verify_key(cls, connection_id, server_finished): | |
md5 = MD5.new(cls.SECRET_KEY + '0' + CHALLENGE + connection_id).digest() | |
des = Crypto.Cipher.DES.new(md5[:8], mode=Crypto.Cipher.DES.MODE_CBC, IV=KEY_ARGUMENT) | |
try: | |
decryption = des.decrypt(server_finished[3:]) | |
except ValueError, e: | |
return False | |
if decryption[17:].startswith(CHALLENGE): | |
return True | |
return False | |
cipher_suites = [RC2Export, RC4Export, RC4, DES] | |
def parse_certificate(derData): | |
cert = decode(derData, asn1Spec=Certificate())[0] | |
x509cert = X509Certificate(cert) | |
tbs = x509cert.tbsCertificate | |
algType = tbs.pub_key_info.algType | |
algParams = tbs.pub_key_info.key | |
if (algType != PublicKeyInfo.RSA): | |
print 'Certificate algType is not RSA' | |
raise Exception() | |
return RSA.construct((long(hexlify(algParams["mod"]), 16), long(algParams["exp"]))) | |
class Protocol(Enum): | |
BARE_SSLv2 = 1 | |
ESMTP = 2 | |
IMAP = 3 | |
POP3 = 4 | |
PSQL = 5 | |
def sslv2_connect(ip, port, protocol, cipher_suite, result_additional_data): | |
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) | |
s.settimeout(SOCKET_TIMEOUT) | |
try: | |
s.connect((ip, port)) | |
except socket.error, e: | |
try: | |
s.connect((ip, port)) | |
except socket.error, e: | |
try: | |
s.connect((ip, port)) | |
except socket.error, e: | |
print '%s: Case 1 - port is apparently closed (after 3 tries); Connect failed' % ip | |
return CON_FAIL | |
starttls_response = "n/a" | |
try: | |
if protocol == Protocol.ESMTP: | |
banner = s.recv(SOCKET_RECV_SIZE) | |
s.send("EHLO testing\r\n") | |
ehlo_response = s.recv(SOCKET_RECV_SIZE) | |
if "starttls" not in ehlo_response.lower(): | |
print "%s: Case 2a; Server apparently doesn't support STARTTLS" % ip | |
return NO_STARTTLS | |
s.send("STARTTLS\r\n") | |
starttls_response = s.recv(SOCKET_RECV_SIZE) | |
if protocol == Protocol.IMAP: | |
banner = s.recv(SOCKET_RECV_SIZE) | |
s.send(". CAPABILITY\r\n") | |
banner = s.recv(SOCKET_RECV_SIZE) | |
if "starttls" not in banner.lower(): | |
print "%s: Case 2b; Server apparently doesn't support STARTTLS" % ip | |
return NO_STARTTLS | |
s.send(". STARTTLS\r\n") | |
starttls_response = s.recv(SOCKET_RECV_SIZE) | |
if protocol == Protocol.POP3: | |
banner = s.recv(SOCKET_RECV_SIZE) | |
s.send("STLS\r\n") | |
starttls_response = s.recv(SOCKET_RECV_SIZE) | |
except socket.error, e: | |
print "Errorx: " + str(e) + " - starttls_response: '" + starttls_response + "'" | |
print '%s: Case 2c; starttls negotiation failed' % ip | |
return NO_STARTTLS | |
if protocol == Protocol.PSQL: | |
s.send(ii_pack(8, 80877103)) | |
if "S" != s.recv(1): | |
print "Error: PSQL server does not support SSL" | |
raise Exception() | |
else: | |
print "PSQL server accepts SSL" | |
client_hello = SSLv2Record()/SSLv2ClientHello(cipher_suites=SSL2_CIPHER_SUITES.keys(),challenge=CHALLENGE) | |
s.sendall(str(client_hello)) | |
rlist, wlist, xlist = select.select([s], [], [s], SOCKET_TIMEOUT) | |
if s in xlist or not s in rlist: | |
print '%s: Case 3a; Server did not response properly to client hello' % ip | |
s.close() | |
return "3a: %s" % NO_TLS | |
try: | |
server_hello_raw = s.recv(SOCKET_RECV_SIZE) | |
except socket.error, e: | |
print '%s: Case 3b; Connection reset by peer when waiting for server hello' % ip | |
s.close() | |
return "3b: %s" % NO_TLS | |
server_hello = timeout(SSL, (server_hello_raw,), timeout_duration=SOCKET_TIMEOUT) | |
if server_hello == None: | |
print '%s: Case 3c; Timeout on parsing server hello' % ip | |
s.close() | |
return "3c: %s" % NO_TLS | |
if not SSLv2ServerHello in server_hello: | |
print '%s: Case 3d; Server hello did not contain server hello' % ip | |
s.close() | |
return "3d: %s" % NO_TLS | |
parsed_server_hello = server_hello[SSLv2ServerHello] | |
connection_id = parsed_server_hello.connection_id | |
cert = parsed_server_hello.certificates | |
try: | |
public_key = parse_certificate(cert) | |
except: | |
# Server could still be vulnerable, we just can't tell, so we assume it isn't | |
print '%s: Case 4a; Could not extract public key from DER' % ip | |
s.close() | |
return "4a: %s" % NO_TLS | |
server_advertised_cipher_suites = parsed_server_hello.fields["cipher_suites"] | |
cipher_suite_advertised = cipher_suite.get_constant() in server_advertised_cipher_suites | |
client_master_key = cipher_suite.get_client_master_key(public_key) | |
client_key_exchange = SSLv2Record()/client_master_key | |
s.sendall(str(client_key_exchange)) | |
rlist, wlist, xlist = select.select([s], [], [s], SOCKET_TIMEOUT) | |
if s in xlist: | |
print '%s: Case 4b; Exception on socket after sending client key exchange' % ip | |
s.close() | |
return "4b: %s" % NO_TLS | |
if not s in rlist: | |
print '%s: Case 5; Server did not send finished in time' % ip | |
s.close() | |
return "5: %s" % NO_TLS | |
try: | |
server_finished = s.recv(SOCKET_RECV_SIZE) | |
except socket.error, e: | |
print '%s: Case 4c; Connection reset by peer when waiting for server finished' % ip | |
s.close() | |
return "4c: %s" % NO_TLS | |
if server_finished == '': | |
print '%s: Case 4d; Empty server_finished' % ip | |
s.close() | |
return "4d: %s" % NO_TLS | |
if not cipher_suite.verify_key(connection_id, server_finished): | |
print '%s: Case 7; Symmetric key did not successfully verify on server finished message' % ip | |
return "7: %s" % NO_TLS | |
s.close() | |
result_additional_data['cipher_suite_advertised'] = cipher_suite_advertised | |
return "%s:%s" % (VULN, base64.b64encode(public_key.exportKey(format='DER'))) | |
if __name__ == '__main__': | |
ip = sys.argv[1] | |
port = int(sys.argv[2]) | |
scan_id = os.getcwd() | |
dtime = datetime.datetime.now() | |
print 'Testing %s on port %s' % (ip, port) | |
protocol = Protocol.BARE_SSLv2 | |
if len(sys.argv) >= 4: | |
if sys.argv[3] == '-esmtp': | |
protocol = Protocol.ESMTP | |
elif sys.argv[3] == '-imap': | |
protocol = Protocol.IMAP | |
elif sys.argv[3] == '-pop3': | |
protocol = Protocol.POP3 | |
elif sys.argv[3] == '-bare': | |
protocol = Protocol.BARE_SSLv2 | |
elif sys.argv[3] == '-psql': | |
protocol = Protocol.PSQL | |
else: | |
print 'You gave 3 arguments, argument 3 is not a recognized protocol. Bailing out' | |
sys.exit(1) | |
vulns = [] | |
for cipher_suite in cipher_suites: | |
string_description = cipher_suite.get_string_description() | |
ret_additional_data = {} | |
ret = sslv2_connect(ip, port, protocol, cipher_suite, ret_additional_data) | |
if ret.startswith(VULN): | |
pub_key = ret.replace('%s:' % VULN, '') | |
cve_string = "" | |
if not ret_additional_data['cipher_suite_advertised']: | |
cve_string = " to CVE-2015-3197" | |
if string_description == "RC4_128_WITH_MD5": | |
if cve_string == "": | |
cve_string = " to CVE-2016-0703" | |
else: | |
cve_string += " and CVE-2016-0703" | |
print '%s: Server is vulnerable%s, with cipher %s\n' % (ip, cve_string, string_description) | |
else: | |
print '%s: Server is NOT vulnerable with cipher %s, Message: %s\n' % (ip, string_description, ret) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
git diff: