Last active
August 12, 2021 21:26
-
-
Save FrankSpierings/0b5f0a2ad011447f7c2d41c0b4420588 to your computer and use it in GitHub Desktop.
Python SMB Spider (impacket)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from impacket.smbconnection import SMBConnection, SessionError | |
from impacket.smb3structs import FILE_READ_DATA | |
from time import strftime, localtime | |
import re | |
import logging | |
log = logging.getLogger() | |
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', | |
level=logging.DEBUG, datefmt='%I:%M:%S') | |
# log.setLevel(logging.DEBUG) | |
log.setLevel(logging.INFO) | |
def get_fullpath(conn, share, path): | |
return '//{}/{}{}'.format(conn.getRemoteHost(), share, path) | |
class Spider(): | |
class Match(): | |
def __init__(self, conn, share, path, pattern, match_type): | |
self.conn = conn | |
self.share = share | |
self.path = path | |
self.pattern = pattern | |
self.match_type = match_type | |
self.obj = self.conn.listPath(self.share, path)[0] | |
self.mtime = strftime('%Y-%m-%d %H:%M', localtime(self.obj.get_mtime_epoch())) | |
def __str__(self): | |
output = '' | |
output += "<path='{}'".format(get_fullpath(self.conn, self.share, self.path)) | |
output += ", mtime='{}'".format(self.mtime) | |
output += ", pattern='{}'".format(self.pattern) | |
output += ", match='{}'".format(self.match_type) | |
output += '>' | |
return output | |
def __repr__(self): | |
return self.__str__() | |
def __init__(self, conn, share, path, depth=10, extensions=None, path_re=None, content_re=None, path_re_ex=None): | |
# 32MB Blocksize | |
self.BLOCKSIZE = 32000000 | |
self.conn = conn | |
if extensions and not isinstance(extensions, list): | |
extensions = [extensions] | |
self.extensions = extensions | |
if path_re and not isinstance(path_re, list): | |
path_re = [path_re] | |
self.path_re = path_re | |
if content_re and not isinstance(content_re, list): | |
content_re = [content_re] | |
self.content_re = content_re | |
if path_re_ex and not isinstance(path_re_ex, list): | |
path_re_ex = [path_re_ex] | |
self.path_re_ex = path_re_ex | |
if path == '.' or path == '/' or path == './': | |
path = '' | |
self.path = path | |
self.depth = depth | |
self.matches = [] | |
self.share = share | |
self.__tid = None | |
def __fullpath(self, path): | |
return get_fullpath(self.conn, self.share, path) | |
def __smbpath(self, path): | |
if path == '': | |
return '*/' | |
else: | |
return '{}/*'.format(path) | |
def spider(self): | |
try: | |
self.__tid = self.conn.connectTree(self.share) | |
except SessionError as e: | |
log.warning('Unable to spider: {} - {}'.format(self.__fullpath(self.path), str(e))) | |
return | |
log.info('Spidering: {}'.format(self.__fullpath(self.path))) | |
self.__spider(path=self.path, depth=0) | |
def __spider(self, path, depth): | |
if depth == self.depth: | |
return | |
log.debug('Directory: {}'.format(self.__fullpath(path))) | |
try: | |
entries = self.conn.listPath(self.share, self.__smbpath(path)) | |
except SessionError as e: | |
if not 'STATUS_ACCESS_DENIED' in str(e): | |
log.warning('Path "{}", error: {}'.format(self.__fullpath(path), str(e))) | |
return | |
for entry in entries: | |
name = entry.get_longname() | |
entrypath = '{}/{}'.format(path, name) | |
if name == '.' or name == '..': | |
continue | |
# Directory | |
if entry.is_directory(): | |
if not self.__match_path_ex('{}/'.format(entrypath)): | |
self.__match_path(entrypath) | |
self.__spider(entrypath, depth=depth+1) | |
# File | |
else: | |
if not self.__match_path_ex('{}'.format(entrypath)): | |
self.__match_path(entrypath, True) | |
self.__match_content(entrypath) | |
log.debug('File: {}'.format(self.__fullpath(entrypath))) | |
def __match_path_ex(self, path): | |
if self.path_re_ex is not None: | |
for pattern in self.path_re_ex: | |
if re.search(pattern, path, re.IGNORECASE): | |
log.debug("Exlusion matched: '{}'\tPattern: '{}'".format(self.__fullpath(path), pattern)) | |
return True | |
return False | |
def __match_path(self, path, isFile=False): | |
if self.path_re: | |
for pattern in self.path_re: | |
if re.findall(pattern, path, re.IGNORECASE): | |
# log.info("Path matched: '{}'\tPattern: '{}'".format(self.__fullpath(path), pattern)) | |
match = self.Match(self.conn, self.share, path, pattern, "Path Match") | |
self.matches.append(match) | |
return | |
def __match_content(self, path): | |
if self.extensions: | |
found = False | |
for extension in self.extensions: | |
if path.lower().endswith(extension.lower()): | |
log.debug("Extension matched: '{}'\tPattern: '{}'".format(self.__fullpath(path), extension)) | |
found = True | |
break | |
if found != True: | |
return | |
tid = self.__tid | |
try: | |
fid = self.conn.openFile(tid, path, desiredAccess=FILE_READ_DATA) | |
cur = 0 | |
data = self.conn.readFile(tid, fid, cur, self.BLOCKSIZE) | |
while data is not None and data is not '': | |
cur = cur + len(data) | |
for pattern in self.content_re: | |
if re.findall(pattern, data, re.IGNORECASE): | |
# log.info("Content matched: '{}'\tPattern: '{}'".format(self.__fullpath(path), pattern)) | |
match = self.Match(self.conn, self.share, path, pattern, "Content Match") | |
self.matches.append(match) | |
return | |
data = self.conn.readFile(tid, fid, cur, self.BLOCKSIZE) | |
self.conn.closeFile(tid, fid) | |
except: | |
log.debug('Could not open: {0}'.format(self.__fullpath(path))) | |
# Example usage: | |
address = '10.0.0.1' | |
target_ip = address | |
user = 'user01' | |
password = 'Password123!' | |
domain = 'lab.test' | |
port = 445 | |
path = '.' | |
# Share to scan, use * for all shares | |
share = '*' | |
# Add to matches if it contains path_re | |
path_re = ['secret', 'password'] | |
# Exclude spidering if one of these get hit | |
path_re_ex = ['/system32/', '/.*?assembly.*?/', '/WInsxs.*?/', '/syswow.*?/'] | |
# Search for the following contents | |
content_re = ['password', 'wacHtwoord'] | |
# But only if the file endswith | |
extensions = ['.txt', '.RtF', '.ini', '.config', '.cfg', '.docx'] | |
# Connect | |
conn = SMBConnection(address, target_ip, sess_port=int(port)) | |
conn.login(user, password, domain) | |
# Check if the shares should be enumerated. | |
if share == '*': | |
for share in [share['shi1_netname'][:-1] for share in conn.listShares()]: | |
spider = Spider(conn, share, path, 2, extensions, path_re, content_re, path_re_ex) | |
spider.spider() | |
for match in spider.matches: | |
print(match) | |
else: | |
# Single share spider | |
spider = Spider(conn, share, path, 10, extensions, path_re, content_re, path_re_ex) | |
spider.spider() | |
for match in spider.matches: | |
print(match) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment