Created
November 28, 2023 00:34
-
-
Save jborean93/4bc4f20bff0ec6a2496eb511d055a8fa to your computer and use it in GitHub Desktop.
Test WinRM with GSSAPI authentication in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import base64 | |
import gssapi | |
import io | |
import re | |
import requests | |
import struct | |
import sys | |
import typing as t | |
import uuid | |
import warnings | |
from requests.packages.urllib3.exceptions import InsecureRequestWarning | |
from xml.etree import ElementTree as ET | |
WSMAN_NS = { | |
"s": "http://www.w3.org/2003/05/soap-envelope", | |
"xs": "http://www.w3.org/2001/XMLSchema", | |
"xsi": "http://www.w3.org/2001/XMLSchema-instance", | |
"wsa": "http://schemas.xmlsoap.org/ws/2004/08/addressing", | |
"wsman": "http://schemas.dmtf.org/wbem/wsman/1/wsman.xsd", | |
"wsmid": "http://schemas.dmtf.org/wbem/wsman/identify/1/wsmanidentity.xsd", | |
"wsmanfault": "http://schemas.microsoft.com/wbem/wsman/1/wsmanfault", | |
"cim": "http://schemas.dmtf.org/wbem/wscim/1/common", | |
"wsmv": "http://schemas.microsoft.com/wbem/wsman/1/wsman.xsd", | |
"cfg": "http://schemas.microsoft.com/wbem/wsman/1/config", | |
"sub": "http://schemas.microsoft.com/wbem/wsman/1/subscription", | |
"rsp": "http://schemas.microsoft.com/wbem/wsman/1/windows/shell", | |
"m": "http://schemas.microsoft.com/wbem/wsman/1/machineid", | |
"cert": "http://schemas.microsoft.com/wbem/wsman/1/config/service/certmapping", | |
"plugin": "http://schemas.microsoft.com/wbem/wsman/1/config/PluginConfiguration", | |
"wsen": "http://schemas.xmlsoap.org/ws/2004/09/enumeration", | |
"wsdl": "http://schemas.xmlsoap.org/wsdl", | |
"wst": "http://schemas.xmlsoap.org/ws/2004/09/transfer", | |
"wsp": "http://schemas.xmlsoap.org/ws/2004/09/policy", | |
"wse": "http://schemas.xmlsoap.org/ws/2004/08/eventing", | |
"i": "http://schemas.microsoft.com/wbem/wsman/1/cim/interactive.xsd", | |
"xml": "http://www.w3.org/XML/1998/namespace", | |
"pwsh": "http://schemas.microsoft.com/powershell", | |
} | |
def wrap_winrm( | |
context: gssapi.raw.SecurityContext, | |
data: bytes, | |
) -> tuple[bytes, bytes, int]: | |
iov = gssapi.raw.IOV( | |
gssapi.raw.IOVBufferType.header, | |
data, | |
gssapi.raw.IOVBufferType.padding, | |
std_layout=False, | |
) | |
gssapi.raw.wrap_iov(context, iov, confidential=True, qop=None) | |
return iov[0].value or b"", iov[1].value or b"", len(iov[2].value or b"") | |
def unwrap_winrm( | |
context: gssapi.raw.SecurityContext, | |
header: bytes, | |
data: bytes, | |
) -> bytes: | |
iov = gssapi.raw.IOV( | |
(gssapi.raw.IOVBufferType.header, False, header), | |
data, | |
(gssapi.raw.IOVBufferType.data, True, None), | |
std_layout=False, | |
) | |
gssapi.raw.unwrap_iov(context, iov) | |
return iov[1].value or b"" | |
class HTTPWinRMAuth(requests.auth.AuthBase): | |
def __init__( | |
self, | |
context: gssapi.raw.SecurityContext, | |
) -> None: | |
self.context = context | |
self.header = None | |
self.valid_protocols = ["Negotiate", "Kerberos", "NTLM"] | |
self._regex = re.compile( | |
r"(%s)\s*([^,]*),?" % "|".join(self.valid_protocols), re.I | |
) | |
def __call__( | |
self, | |
request: requests.PreparedRequest, | |
) -> requests.PreparedRequest: | |
request.headers["Connection"] = "Keep-Alive" | |
request.register_hook("response", self.response_hook) | |
return request | |
def response_hook( | |
self, | |
response: requests.Response, | |
**kwargs: t.Any, | |
) -> requests.Response: | |
if response.status_code == 401: | |
response = self.handle_401(response, **kwargs) | |
return response | |
def handle_401( | |
self, | |
response: requests.Response, | |
**kwargs: t.Any, | |
) -> requests.Response: | |
auth_supported = response.headers.get("www-authenticate", "") | |
matched_protocols = [ | |
p for p in self.valid_protocols if p.upper() in auth_supported.upper() | |
] | |
if not matched_protocols: | |
return response | |
self.header = matched_protocols = matched_protocols[0] | |
out_token = self.context.step() | |
while not self.context.complete or out_token is not None: | |
response.content | |
response.raw.release_conn() | |
request = response.request.copy() | |
auth_header = b"%s %s" % (self.header.encode(), base64.b64encode(out_token)) | |
request.headers["Authorization"] = auth_header | |
response = response.connection.send(request, **kwargs) | |
in_token = None | |
auth_header = response.headers.get("www-authenticate", "") | |
token_match = self._regex.search(auth_header) | |
if token_match: | |
in_token = token_match.group(2) | |
if not in_token: | |
break | |
out_token = self.context.step(base64.b64decode(in_token)) | |
return response | |
def winrm_run( | |
context: gssapi.raw.SecurityContext, | |
server: str, | |
command: str, | |
arguments: t.Optional[t.List[str]] = None, | |
) -> tuple[int, str, str]: | |
http = requests.Session() | |
http.auth = HTTPWinRMAuth(context) | |
warnings.simplefilter("ignore", category=InsecureRequestWarning) | |
http.headers = { | |
"User-Agent": "pyspnego_client", | |
} | |
endpoint = "http://%s:5985/wsman" % server | |
# We need to ensure we have set up the context already so we can start encrypting the data. | |
request = requests.Request("POST", endpoint, data=None) | |
prep_request = http.prepare_request(request) | |
response = http.send(prep_request) | |
response.raise_for_status() | |
setattr(http, "endpoint", endpoint) | |
setattr(http, "session_id", str(uuid.uuid4()).upper()) | |
shell_id = wsman_create(http) | |
try: | |
cmd_id = wsman_command(http, shell_id, command, arguments) | |
rc, stdout, stderr = wsman_receive(http, shell_id, cmd_id) | |
wsman_signal( | |
http, | |
shell_id, | |
cmd_id, | |
"http://schemas.microsoft.com/wbem/wsman/1/windows/shell/signal/Terminate", | |
) | |
if stderr.startswith("#< CLIXML"): | |
# Strip off the '#< CLIXML\r\n' by finding the 2nd index of '<' | |
output = stderr[stderr.index("<", 2) :] | |
element = ET.fromstring(output) | |
namespace = element.tag.replace("Objs", "")[1:-1] | |
errors = [] | |
for error in element.findall("{%s}S[@S='Error']" % namespace): | |
errors.append(error.text or "") | |
stderr = "".join(errors).replace("_x000D_", "\r").replace("_x000A_", "\n") | |
return rc, stdout, stderr | |
finally: | |
wsman_delete(http, shell_id) | |
def wsman_command( | |
http: requests.Session, | |
shell_id: str, | |
command: str, | |
arguments: t.Optional[t.List[str]] = None, | |
) -> str: | |
rsp = WSMAN_NS["rsp"] | |
command_line = ET.Element("{%s}CommandLine" % rsp) | |
ET.SubElement(command_line, "{%s}Command" % rsp).text = command | |
for argument in arguments or []: | |
ET.SubElement(command_line, "{%s}Arguments" % rsp).text = argument | |
command_response = wsman_envelope( | |
"http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Command", | |
http, | |
body=command_line, | |
selector_set={"ShellId": shell_id}, | |
option_set={"WINRS_SKIP_CMD_SHELL": False}, | |
) | |
return command_response.find( | |
"s:Body/rsp:CommandResponse/rsp:CommandId", WSMAN_NS | |
).text | |
def wsman_create( | |
http: requests.Session, | |
) -> str: | |
rsp = WSMAN_NS["rsp"] | |
shell = ET.Element("{%s}Shell" % rsp) | |
ET.SubElement(shell, "{%s}InputStreams" % rsp).text = "stdin" | |
ET.SubElement(shell, "{%s}OutputStreams" % rsp).text = "stdout stderr" | |
create_response = wsman_envelope( | |
"http://schemas.xmlsoap.org/ws/2004/09/transfer/Create", | |
http, | |
body=shell, | |
option_set={"WINRS_CODEPAGE": 65001}, | |
) | |
return create_response.find("s:Body/rsp:Shell/rsp:ShellId", WSMAN_NS).text | |
def wsman_delete( | |
http: requests.Session, | |
shell_id: str, | |
) -> None: | |
wsman_envelope( | |
"http://schemas.xmlsoap.org/ws/2004/09/transfer/Delete", | |
http, | |
selector_set={"ShellId": shell_id}, | |
) | |
def wsman_receive( | |
http: requests.Session, | |
shell_id: str, | |
command_id: str, | |
) -> tuple[int, str, str]: | |
rsp = WSMAN_NS["rsp"] | |
out = { | |
"stdout": io.BytesIO(), | |
"stderr": io.BytesIO(), | |
} | |
while True: | |
receive = ET.Element("{%s}Receive" % rsp) | |
ET.SubElement( | |
receive, "{%s}DesiredStream" % rsp, attrib={"CommandId": command_id} | |
).text = "stdout stderr" | |
receive_response = wsman_envelope( | |
"http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Receive", | |
http, | |
body=receive, | |
selector_set={"ShellId": shell_id}, | |
option_set={"WSMAN_CMDSHELL_OPTION_KEEPALIVE": True}, | |
) | |
streams = receive_response.findall( | |
"s:Body/rsp:ReceiveResponse/rsp:Stream", WSMAN_NS | |
) | |
for stream in streams: | |
if stream.text: | |
b_data = base64.b64decode(stream.text) | |
out[stream.attrib["Name"]].write(b_data) | |
state = receive_response.find( | |
"s:Body/rsp:ReceiveResponse/rsp:CommandState", WSMAN_NS | |
) | |
if state.attrib["State"].endswith("Done"): | |
rc = int(state.find("rsp:ExitCode", WSMAN_NS).text) | |
break | |
return ( | |
rc, | |
out["stdout"].getvalue().decode("utf-8"), | |
out["stderr"].getvalue().decode("utf-8"), | |
) | |
def wsman_signal( | |
http: requests.Session, | |
shell_id: str, | |
command_id: str, | |
code: str, | |
) -> None: | |
rsp = WSMAN_NS["rsp"] | |
signal = ET.Element("{%s}Signal" % rsp, attrib={"CommandId": command_id}) | |
ET.SubElement(signal, "{%s}Code" % rsp).text = code | |
wsman_envelope( | |
"http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Signal", | |
http, | |
body=signal, | |
selector_set={"ShellId": shell_id}, | |
) | |
def wsman_envelope( | |
action: str, | |
http: requests.Session, | |
selector_set: t.Optional[t.Dict[str, str]] = None, | |
option_set: t.Optional[t.Dict[str, str]] = None, | |
body: t.Optional[ET.Element] = None, | |
) -> ET.Element: | |
s = WSMAN_NS["s"] | |
wsa = WSMAN_NS["wsa"] | |
wsman = WSMAN_NS["wsman"] | |
wsmv = WSMAN_NS["wsmv"] | |
xml = WSMAN_NS["xml"] | |
understand = "{%s}mustUnderstand" % s | |
envelope = ET.Element("{%s}Envelope" % s) | |
header = ET.SubElement(envelope, "{%s}Header" % WSMAN_NS["s"]) | |
ET.SubElement(header, "{%s}Action" % wsa, attrib={understand: "true"}).text = action | |
ET.SubElement(header, "{%s}SessionId" % wsmv, attrib={understand: "false"}).text = ( | |
"uuid:%s" % http.session_id.upper() | |
) | |
ET.SubElement(header, "{%s}To" % wsa).text = http.endpoint | |
ET.SubElement( | |
header, "{%s}MaxEnvelopeSize" % wsman, attrib={understand: "true"} | |
).text = "153600" | |
ET.SubElement(header, "{%s}MessageID" % wsa).text = ( | |
"uuid:%s" % str(uuid.uuid4()).upper() | |
) | |
ET.SubElement(header, "{%s}OperationTimeout" % wsman).text = "PT30S" | |
reply_to = ET.SubElement(header, "{%s}ReplyTo" % wsa) | |
ET.SubElement( | |
reply_to, "{%s}Address" % wsa, attrib={understand: "true"} | |
).text = "http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous" | |
ET.SubElement( | |
header, "{%s}ResourceURI" % wsman, attrib={understand: "true"} | |
).text = "http://schemas.microsoft.com/wbem/wsman/1/windows/shell/cmd" | |
for e in ["DataLocale", "Locale"]: | |
ET.SubElement( | |
header, | |
"{%s}%s" % (wsmv, e), | |
attrib={understand: "false", "{%s}lang" % xml: "en-US"}, | |
) | |
for set_value, name, option_name in [ | |
(selector_set, "SelectorSet", "Selector"), | |
(option_set, "OptionSet", "Option"), | |
]: | |
if not set_value: | |
continue | |
set_element = ET.SubElement(header, "{%s}%s" % (wsman, name)) | |
if name == "OptionSet": | |
set_element.attrib = {understand: "true"} | |
for key, value in set_value.items(): | |
ET.SubElement( | |
set_element, "{%s}%s" % (wsman, option_name), Name=key | |
).text = str(value) | |
envelope_body = ET.SubElement(envelope, "{%s}Body" % s) | |
if body is not None: | |
envelope_body.append(body) | |
content = ET.tostring(envelope, encoding="utf-8", method="xml") | |
boundary = "Encrypted Boundary" | |
if http.endpoint.startswith("http://"): | |
auth_protocol = "SPNEGO" | |
protocol = "application/HTTP-%s-session-encrypted" % auth_protocol | |
max_size = 16384 if auth_protocol == "CredSSP" else len(content) | |
chunks = [content[i : i + max_size] for i in range(0, len(content), max_size)] | |
encrypted_chunks = [] | |
for chunk in chunks: | |
header, wrapped_data, padding_length = wrap_winrm(http.auth.context, chunk) | |
wrapped_data = struct.pack("<i", len(header)) + header + wrapped_data | |
msg_length = str(len(content) + padding_length) | |
content = "\r\n".join( | |
[ | |
"--%s" % boundary, | |
"\tContent-Type: %s" % protocol, | |
"\tOriginalContent: type=application/soap+xml;charset=UTF-8;Length=%s" | |
% msg_length, | |
"--%s" % boundary, | |
"\tContent-Type: application/octet-stream", | |
"", | |
] | |
) | |
encrypted_chunks.append(content.encode() + wrapped_data) | |
content_sub_type = ( | |
"multipart/encrypted" | |
if len(encrypted_chunks) == 1 | |
else "multipart/x-multi-encrypted" | |
) | |
content_type = '%s;protocol="%s";boundary="%s"' % ( | |
content_sub_type, | |
protocol, | |
boundary, | |
) | |
content = b"".join(encrypted_chunks) + ("--%s--\r\n" % boundary).encode() | |
else: | |
content_type = "application/soap+xml;charset=UTF-8" | |
headers = { | |
"Content-Length": str(len(content)), | |
"Content-Type": content_type, | |
} | |
request = http.prepare_request( | |
requests.Request("POST", http.endpoint, data=content, headers=headers) | |
) | |
response = http.send(request) | |
response.raise_for_status() | |
content = response.content | |
content_type = response.headers.get("content-type", "") | |
if content_type.startswith("multipart/encrypted;") or content_type.startswith( | |
"multipart/x-multi-encrypted;" | |
): | |
boundary = re.search("boundary=[" '|\\"](.*)[' '|\\"]', content_type).group(1) | |
parts = re.compile((r"--\s*%s\r\n" % re.escape(boundary)).encode()).split( | |
content | |
) | |
parts = list(filter(None, parts)) | |
content = b"" | |
for i in range(0, len(parts), 2): | |
header = parts[i].strip() | |
payload = parts[i + 1] | |
expected_length = int(header.split(b"Length=")[1]) | |
# remove the end MIME block if it exists | |
payload = re.sub((r"--\s*%s--\r\n$" % boundary).encode(), b"", payload) | |
wrapped_data = payload.replace( | |
b"\tContent-Type: application/octet-stream\r\n", b"" | |
) | |
header_length = struct.unpack("<i", wrapped_data[:4])[0] | |
header = wrapped_data[4 : 4 + header_length] | |
wrapped_data = wrapped_data[4 + header_length :] | |
unwrapped_data = unwrap_winrm(http.auth.context, header, wrapped_data) | |
assert len(unwrapped_data) == expected_length | |
content += unwrapped_data | |
return ET.fromstring(content) | |
def main() -> None: | |
kerberos = gssapi.OID.from_int_seq("1.2.840.113554.1.2.2") | |
target_host = sys.argv[1] | |
cmd = sys.argv[2] | |
arguments = None | |
if len(sys.argv) > 3: | |
arguments = sys.argv[3:] | |
ctx = gssapi.SecurityContext( | |
name=gssapi.Name( | |
f"host@{target_host}", name_type=gssapi.NameType.hostbased_service | |
), | |
creds=None, | |
mech=kerberos, | |
usage="initiate", | |
) | |
rc, stdout, stderr = winrm_run(ctx, target_host, cmd, arguments) | |
sys.stdout.write(stdout) | |
sys.stderr.write(stderr) | |
sys.exit(rc) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment