Skip to content

Instantly share code, notes, and snippets.

@jborean93
Last active August 9, 2024 11:09
Show Gist options
  • Save jborean93/13dbfbd94e83ff810afc55e178371eb9 to your computer and use it in GitHub Desktop.
Save jborean93/13dbfbd94e83ff810afc55e178371eb9 to your computer and use it in GitHub Desktop.
List SMB shares using smbprotocol library
#!/usr/bin/env python3
# Copyright: (c) 2024, Jordan Borean (@jborean93) <[email protected]>
# MIT License (see LICENSE or https://opensource.org/licenses/MIT)
# PYTHON_ARGCOMPLETE_OK
# Big thanks to pysmb for help with the RPC structures
# https://github.com/miketeo/pysmb
from __future__ import annotations
import argparse
import collections.abc
import dataclasses
import enum
import sys
import smbclient
from smbprotocol.ioctl import CtlCode, IOCTLFlags, SMB2IOCTLRequest, SMB2IOCTLResponse
HAS_ARGCOMPLETE = True
try:
import argcomplete
except ImportError:
HAS_ARGCOMPLETE = False
class ShareType(enum.IntEnum):
STYPE_DISKTREE = 0x00000000
STYPE_PRINTQ = 0x00000001
STYPE_DEVICE = 0x00000002
STYPE_IPC = 0x00000003
STYPE_CLUSTER_FS = 0x02000000
STYPE_CLUSTER_SOFS = 0x04000000
STYPE_CLUSTER_DFS = 0x08000000
class ShareTypeFlags(enum.IntFlag):
NONE = 0x00000000
STYPE_TEMPORARY = 0x40000000
STYPE_SPECIAL = 0x80000000
@dataclasses.dataclass(frozen=True)
class ShareInfo:
name: str
share_type: ShareType
share_type_flags: ShareTypeFlags
comment: str
def build_dce_rpc_req(call_id: int, packet_type: int, data: bytes) -> bytes:
return b"".join(
[
# Version
b"\x05"
# Version minor
b"\x00",
# Packet type
packet_type.to_bytes(1, byteorder="little"),
# Packet Flags (First | Last fragment)
b"\x03",
# Data rep (ASCII/Little Endian/IEEE)
b"\x10\x00\x00\x00",
# Frag Length
(len(data) + 16).to_bytes(2, byteorder="little"),
# Auth Length
b"\x00\x00",
# Call Id
call_id.to_bytes(4, byteorder="little"),
data,
]
)
def build_dce_rpc_bind(call_id: int) -> bytes:
data = b"".join(
[
# Max Xmit Frag - 4280
b"\xb8\x10",
# Max Recv Frag - 4280
b"\xb8\x10",
# Assoc Group
b"\x00\x00\x00\x00",
# Num Ctx Items
b"\x02\x00\x00\x00",
# Ctx 1 - SRVSVC v3.0 - 32bit NDR
# 8a885d04-1ceb-11c9-9fe8-08002b104860
b"\x00\x00\x01\x00\xc8\x4f\x32\x4b\x70\x16\xd3\x01\x12\x78\x5a\x47",
b"\xbf\x6e\xe1\x88\x03\x00\x00\x00\x04\x5d\x88\x8a\xeb\x1c\xc9\x11",
b"\x9f\xe8\x08\x00\x2b\x10\x48\x60\x02\x00\x00\x00",
# Ctx 2 - SRVSVC v3.0 - Bind Time Feature Negotiation
# 6cb71c2c-9812-4540-0300-000000000000
b"\x01\x00\x01\x00\xc8\x4f\x32\x4b\x70\x16\xd3\x01\x12\x78\x5a\x47",
b"\xbf\x6e\xe1\x88\x03\x00\x00\x00\x2c\x1c\xb7\x6c\x12\x98\x40\x45",
b"\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00",
]
)
return build_dce_rpc_req(call_id, 11, data)
def build_net_share_enum(call_id: int, server_name: str) -> bytes:
"""
NET_API_STATUS NetrShareEnum(
[in, string, unique] SRVSVC_HANDLE ServerName,
[in, out] LPSHARE_ENUM_STRUCT InfoStruct,
[in] DWORD PreferedMaximumLength,
[out] DWORD* TotalEntries,
[in, out, unique] DWORD* ResumeHandle
);
"""
server_len = len(server_name) + 1
server_len_padding = b""
if server_len % 2:
server_len_padding = b"\x00\x00"
b_server_name = (server_name + "\u0000").encode("utf-16-le")
stub_data = b"".join(
[
# ServerName - Pointer
b"\x00\x00\x02\x00", # Referent Id
server_len.to_bytes(4, byteorder="little"), # Max Value Len
b"\x00\x00\x00\x00", # Offset
server_len.to_bytes(4, byteorder="little"), # Value Len
b_server_name,
server_len_padding,
# InfoStruct
(1).to_bytes(4, byteorder="little"), # Level - SHARE_INFO_1_CONTAINER
b"\x01\x00\x00\x00\x04\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00", # Unknown still part of InfoStruct
b"\xff\xff\xff\xff", # PreferedMaximumLength
b"\x08\x00\x02\x00\x00\x00\x00\x00", # ResumeHandle
]
)
rpc_request_data = b"".join(
[
# Alloc hint
b"\x4c\x00\x00\x00",
# Context ID
b"\x00\x00",
# Opnum (NetShareEnumAll),
(15).to_bytes(2, byteorder="little"),
stub_data,
]
)
return build_dce_rpc_req(call_id, 0, rpc_request_data)
def unpack_smb_share_info(data: memoryview) -> list[ShareInfo]:
# Ignore the RPC PDU header (24 bytes) + leading data in the RPC result
# we don't care about (12 bytes).
result_len = int.from_bytes(data[36:40], byteorder="little")
array_view = data[48:]
str_view = array_view[(12 * result_len) :]
results = []
for _ in range(result_len):
share_type_raw = int.from_bytes(array_view[4:8], byteorder="little")
share_type = ShareType(share_type_raw & 0x0FFFFFFF)
share_flags = ShareTypeFlags(share_type_raw & 0xF0000000)
array_view = array_view[12:]
name_len = int.from_bytes(str_view[8:12], byteorder="little")
name = str_view[12 : 12 + (name_len - 1) * 2].tobytes().decode("utf-16-le")
if name_len % 2:
name_len += 1
str_view = str_view[12 + (name_len * 2) :]
comment_len = int.from_bytes(str_view[8:12], byteorder="little")
comment = (
str_view[12 : 12 + (comment_len - 1) * 2].tobytes().decode("utf-16-le")
)
if comment_len % 2:
comment_len += 1
str_view = str_view[12 + (comment_len * 2) :]
results.append(ShareInfo(name, share_type, share_flags, comment))
return results
def get_share_info(server: str) -> list[ShareInfo]:
with smbclient.open_file(
rf"\\{server}\IPC$\srvsvc",
mode="w+b",
buffering=0,
file_type="pipe",
) as srvsvc:
connection = srvsvc.fd.connection
fid = srvsvc.fd.file_id
sid = srvsvc.fd.tree_connect.session.session_id
tid = srvsvc.fd.tree_connect.tree_connect_id
# Bind to DCE RPC service
bind_req = build_dce_rpc_bind(1)
srvsvc.write(bind_req)
_ = srvsvc.read(1024) # bind_ack - should validate this
# Send the NetShareEnumAll as part of an IOCTL request
net_share_enum_req = build_net_share_enum(2, rf"\\{server}")
ioctl_req = SMB2IOCTLRequest()
ioctl_req["ctl_code"] = CtlCode.FSCTL_PIPE_TRANSCEIVE
ioctl_req["file_id"] = fid
ioctl_req["flags"] = IOCTLFlags.SMB2_0_IOCTL_IS_FSCTL
ioctl_req["max_output_response"] = 8196
ioctl_req["buffer"] = net_share_enum_req
req = connection.send(ioctl_req, sid=sid, tid=tid)
resp = connection.receive(req)
ioctl_resp = SMB2IOCTLResponse()
ioctl_resp.unpack(resp["data"].get_value())
# Should validate this is a success response not a failure
net_share_enum_resp = ioctl_resp["buffer"].get_value()
return unpack_smb_share_info(memoryview(net_share_enum_resp))
def parse_args(args: collections.abc.Sequence[str]) -> argparse.Namespace:
"""Parse and return args."""
parser = argparse.ArgumentParser(
description="List SMB shares for the specific server.",
)
parser.add_argument(
"server",
nargs=1,
default="",
type=str,
help="The SMB server to list the share for.",
)
if HAS_ARGCOMPLETE: # pragma: nocover
argcomplete.autocomplete(parser)
parsed_args = parser.parse_args(args)
return parsed_args
def main(args: collections.abc.Sequence[str]) -> None:
"""Run the main program."""
parsed_args = parse_args(args)
shares = get_share_info(parsed_args.server[0])
for share in shares:
print(share)
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment