-
-
Save ion-storm/2787d4d648918dcaa30d0f9ed07833a3 to your computer and use it in GitHub Desktop.
A research aid for tracing security relevant events in the CLR via ETW for detecting malicious assemblies.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import etw | |
import etw.evntrace | |
import sys | |
import argparse | |
import threading | |
class RundownDotNetETW(etw.ETW): | |
def __init__(self, verbose, high_risk_only): | |
# set options | |
self.verbose = verbose | |
self.high_risk_only = high_risk_only | |
self.should_stop_capture = False | |
# define capture provider info | |
providers = [ | |
etw.ProviderInfo( | |
'DotNet Runtime Rundown', | |
etw.GUID("{A669021C-C450-4609-A035-5AF59AF4DF18}"), | |
level=etw.evntrace.TRACE_LEVEL_INFORMATION, | |
any_keywords=[ | |
"LoaderRundownKeyword", | |
"StartRundownKeyword", | |
] | |
), | |
] | |
super().__init__(providers=providers, event_callback=self.print_event) | |
def start(self): | |
# do pre-capture setup | |
self.do_capture_setup() | |
super().start() | |
def stop(self): | |
super().stop() | |
# do post-capture teardown | |
self.do_capture_teardown() | |
def do_capture_setup(self): | |
# do whatever setup for capture here | |
pass | |
def do_capture_teardown(self): | |
# do whatever for capture teardown here | |
pass | |
def print_event(self, event): | |
# Common info | |
proc_id = event[1]["EventHeader"]["ProcessId"] | |
event_id = event[0] | |
to_print = [str(proc_id)] | |
should_print = False if self.high_risk_only else True | |
# Event specific | |
if event_id == 155: # AssemblyDCStart_V1 | |
to_print.append("AssemblyDCStart_V1") | |
to_print.append(event[1]["AssemblyID"]) | |
to_print.append(event[1]["AssemblyFlags"]) | |
to_print.append(event[1]["FullyQualifiedAssemblyName"]) | |
elif event_id == 153: # ModuleDCStart_V2 | |
to_print.append("ModuleDCStart_V2") | |
to_print.append(event[1]["AssemblyID"]) | |
to_print.append(event[1]["ModuleFlags"]) | |
to_print.append(event[1]["ModuleILPath"]) | |
to_print.append(event[1]["ModuleNativePath"]) | |
to_print.append(event[1]["ManagedPdbBuildPath"]) | |
il_path = event[1]["ModuleILPath"].lower() | |
module_flags = event[1]["ModuleFlags"] | |
# Look for byte stream loaded assemblies | |
if not il_path.endswith(".exe") \ | |
and not il_path.endswith(".dll") \ | |
and "Dynamic" not in module_flags: | |
should_print = True | |
elif event_id == 151: # DomainModuleDCStart_V1 | |
to_print.append("DomainModuleDCStart_V1") | |
to_print.append(event[1]["AssemblyID"]) | |
to_print.append(event[1]["ModuleFlags"]) | |
to_print.append(event[1]["ModuleILPath"]) | |
to_print.append(event[1]["ModuleNativePath"]) | |
il_path = event[1]["ModuleILPath"].lower() | |
module_flags = event[1]["ModuleFlags"] | |
# Look for byte stream loaded assemblies | |
if not il_path.endswith(".exe") \ | |
and not il_path.endswith(".dll") \ | |
and "Dynamic" not in module_flags: | |
should_print = True | |
# Interesting in non powershell.exe processes as shows use of powershell dynamically | |
elif "system.management.automation" in il_path.lower(): | |
should_print = True | |
elif event_id == 145: # DCStartComplete_V1 | |
self.should_stop_capture = True | |
if not self.verbose: | |
return | |
elif not self.verbose: | |
return | |
# Print information | |
if self.verbose and should_print: | |
print(event) | |
elif should_print: | |
for n in range(0, len(to_print)): | |
to_print[n] = str(to_print[n]) | |
print(", ".join(to_print)) | |
sys.stdout.flush() | |
class RuntimeDotNetETW(etw.ETW): | |
high_risk_method_names = [ | |
"VirtualAlloc", | |
"VirtualAllocEx", | |
"CreateThread", | |
"CreateRemoteThread", | |
"WriteProcessMemory", | |
"FromBase64String", | |
"DownloadFile", | |
"RunPS", | |
"SetThreadContext", | |
"MiniDumpWriteDump", | |
"LoadLibrary", | |
"GetProcAddress", | |
"WaitForSingleObject" | |
] | |
high_risk_namespaces = [ | |
"System.IO.MemoryStream" | |
] | |
def __init__(self, verbose, high_risk_only, method_tracing): | |
# set options | |
self.verbose = verbose | |
self.high_risk_only = high_risk_only | |
self.method_tracing = method_tracing | |
keywords = [ | |
"LoaderKeyword" | |
] | |
if self.method_tracing: | |
keywords.extend( | |
[ | |
"JitKeyword", | |
"JitTracingKeyword", | |
"InteropKeyword" | |
] | |
) | |
print(keywords) | |
# define capture provider info | |
providers = [ | |
etw.ProviderInfo( | |
'DotNet Runtime', | |
etw.GUID("{E13C0D23-CCBC-4E12-931B-D9CC2EEE27E4}"), | |
level=etw.evntrace.TRACE_LEVEL_VERBOSE if self.method_tracing else etw.evntrace.TRACE_LEVEL_INFORMATION, | |
any_keywords=keywords | |
), | |
] | |
super().__init__(providers=providers, event_callback=self.print_event) | |
def start(self): | |
# do pre-capture setup | |
self.do_capture_setup() | |
super().start() | |
def stop(self): | |
super().stop() | |
# do post-capture teardown | |
self.do_capture_teardown() | |
def do_capture_setup(self): | |
# do whatever setup for capture here | |
pass | |
def do_capture_teardown(self): | |
# do whatever for capture teardown here | |
pass | |
def print_event(self, event): | |
# Common info | |
proc_id = event[1]["EventHeader"]["ProcessId"] | |
event_id = event[0] | |
to_print = [str(proc_id)] | |
should_print = False if self.high_risk_only else True | |
# Event specific | |
if event_id == 154: # AssemblyLoad_V1 | |
to_print.append("AssemblyLoad_V1") | |
to_print.append(event[1]["AssemblyID"]) | |
to_print.append(event[1]["AssemblyFlags"]) | |
to_print.append(event[1]["FullyQualifiedAssemblyName"]) | |
elif event_id == 152: # ModuleLoad_V2 | |
to_print.append("ModuleLoad_V2") | |
to_print.append(event[1]["AssemblyID"]) | |
to_print.append(event[1]["ModuleFlags"]) | |
to_print.append(event[1]["ModuleILPath"]) | |
to_print.append(event[1]["ModuleNativePath"]) | |
to_print.append(event[1]["ManagedPdbBuildPath"]) | |
il_path = event[1]["ModuleILPath"].lower() | |
module_flags = event[1]["ModuleFlags"] | |
# Look for byte stream loaded assemblies | |
if not il_path.endswith(".exe") \ | |
and not il_path.endswith(".dll") \ | |
and "Dynamic" not in module_flags: | |
should_print = True | |
# Interesting in non powershell.exe processes as shows use of powershell dynamically | |
elif "system.management.automation" in il_path.lower(): | |
should_print = True | |
elif event_id == 151: # DomainModuleLoad_V1 | |
to_print.append("DomainModuleLoad_V1") | |
to_print.append(event[1]["AssemblyID"]) | |
to_print.append(event[1]["ModuleFlags"]) | |
to_print.append(event[1]["ModuleILPath"]) | |
to_print.append(event[1]["ModuleNativePath"]) | |
il_path = event[1]["ModuleILPath"].lower() | |
module_flags = event[1]["ModuleFlags"] | |
# Look for byte stream loaded assemblies | |
if not il_path.endswith(".exe") \ | |
and not il_path.endswith(".dll") \ | |
and "Dynamic" not in module_flags: | |
should_print = True | |
elif event_id == 145: # MethodJittingStarted | |
to_print.append("MethodJittingStarted") | |
to_print.append(event[1]["ModuleID"]) | |
to_print.append(event[1]["MethodNamespace"]) | |
to_print.append(event[1]["MethodName"]) | |
if event[1]["MethodName"] in self.high_risk_method_names \ | |
or event[1]["MethodNamespace"] in self.high_risk_namespaces: | |
should_print = True | |
elif event_id == 185 or event_id == 186: # MethodJitInliningSucceeded and MethodJitInliningFailed | |
if event_id == 185: | |
to_print.append("MethodJitInliningSucceeded") | |
else: | |
to_print.append("MethodJitInliningFailed") | |
to_print.append(event[1]["MethodBeingCompiledNamespace"]) | |
to_print.append(event[1]["MethodBeingCompiledName"]) | |
to_print.append(event[1]["InlinerNamespace"]) | |
to_print.append(event[1]["InlinerName"]) | |
to_print.append(event[1]["InlineeNamespace"]) | |
to_print.append(event[1]["InlineeName"]) | |
if event[1]["InlineeName"] in self.high_risk_method_names \ | |
or event[1]["InlineeNamespace"] in self.high_risk_namespaces: | |
should_print = True | |
elif event_id == 88: # ILStubGenerated | |
to_print.append("ILStubGenerated") | |
to_print.append(event[1]["StubFlags"]) | |
to_print.append(event[1]["ManagedInteropMethodNamespace"]) | |
to_print.append(event[1]["ManagedInteropMethodName"]) | |
if event[1]["ManagedInteropMethodName"] in self.high_risk_method_names: | |
should_print = True | |
elif not self.verbose: | |
return | |
# Print information | |
if self.verbose and should_print: | |
print(event) | |
elif should_print: | |
for n in range(0, len(to_print)): | |
to_print[n] = str(to_print[n]) | |
print(", ".join(to_print)) | |
sys.stdout.flush() | |
def rundown_capture(args): | |
# instantiate class | |
capture = RundownDotNetETW(verbose=args.verbose, high_risk_only=args.high_risk_only) | |
# start capture | |
capture.start() | |
# check capture status | |
while not capture.should_stop_capture: | |
time.sleep(0.1) | |
# stop capture | |
capture.stop() | |
def runtime_capture(args): | |
# instantiate class | |
capture = RuntimeDotNetETW( | |
verbose=args.verbose, | |
high_risk_only=args.high_risk_only, | |
method_tracing=args.enable_method_tracing | |
) | |
# start capture | |
capture.start() | |
# wait for return | |
input() | |
# stop capture | |
capture.stop() | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--verbose", | |
help="Displays full details for all event types, as opposed to limited details for specific events", | |
action="store_true" | |
) | |
parser.add_argument( | |
"--high-risk-only", | |
help="Only show high risk events (byte stream loaded assemblies, high risk method calls etc)", | |
action="store_true" | |
) | |
parser.add_argument( | |
"--disable-rundown-provider", | |
help="Do not capture current system state - only show events post-startup", | |
action="store_true" | |
) | |
parser.add_argument( | |
"--enable-method-tracing", | |
help="Enables relevant JIT and Interop events for providing method call visibility", | |
action="store_true" | |
) | |
args = parser.parse_args() | |
if not args.disable_rundown_provider: | |
threading.Thread(target=rundown_capture, args=(args,)).start() | |
runtime_capture(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment