Last active
August 11, 2023 14:55
-
-
Save DanaEpp/1964076c89e1e7af3daae19657687449 to your computer and use it in GitHub Desktop.
Implant Request Logger
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
""" | |
request_logger.py | |
A simple HTTP server that will dump the requests being mirrored from an implant on an API server | |
WARNING: This will record all header and body content of a request to a JSON file. To reduce the risk of | |
information disclosure, care should be placed in the ACL of the output file. | |
Author: Dana Epp ([email protected]) aka SilverStr | |
""" | |
from argparse import ArgumentParser | |
from http.server import BaseHTTPRequestHandler, HTTPServer | |
import logging | |
from datetime import datetime | |
import json | |
class HTTPRequestHandler(BaseHTTPRequestHandler): | |
server_version = "IRLHTTP/1.0" | |
sys_version = "" | |
output_file = "" | |
def _set_response(self): | |
self.send_response(200) | |
self.send_header('Content-Type', 'application/json') | |
self.end_headers() | |
def _log_item(self, body=""): | |
logging.info("\n%s %s %s\n%s\n%s\n---------\n", | |
str(self.command), | |
str(self.path), | |
str(self.request_version), | |
str(self.headers), | |
str(body)) | |
def process_Command(self): | |
post_data = "" | |
try: | |
content_length = 0 | |
if( 'Content-Length' in self.headers ): | |
content_length = int(self.headers['Content-Length'],0) | |
if content_length > 0: | |
post_data = (self.rfile.read(content_length)).decode('utf-8') | |
self._log_item(post_data) | |
self._write_log_item(self._gen_log_item(post_data)) | |
self._set_response() | |
self.wfile.write(b"[]") | |
except: | |
logging.error("Failed to process request." ) | |
pass | |
do_GET = do_HEAD = do_POST = do_PUT = do_DELETE = process_Command | |
def _gen_log_item(self, body) -> str: | |
log_item = {} | |
log_item['timestamp'] = (datetime.now()).strftime("%d/%m/%Y %H:%M:%S") | |
log_item['command'] = str(self.command) | |
log_item['request_version'] = str(self.request_version) | |
log_item['path'] = str(self.path) | |
log_item['headers'] = str(self.headers) | |
log_item['body'] = str(body) | |
return json.dumps(log_item) | |
def _write_log_item(self, log_item: str): | |
with open(self.output_file, "a") as outfile: | |
outfile.write(log_item) | |
outfile.write('\n') | |
def main(server_class=HTTPServer, handler_class=HTTPRequestHandler, port=80, output_file="request_log.json"): | |
logging.basicConfig(level=logging.INFO) | |
server_address = ('', port) | |
httpd = server_class(server_address, handler_class) | |
httpd.RequestHandlerClass.output_file = output_file | |
logging.info('Starting Implant Request Logger on port %d (writing to %s)...\n', port, output_file) | |
try: | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
pass | |
httpd.server_close() | |
logging.info('Stopping Implant Request Logger...\n') | |
if __name__ == '__main__': | |
parser = ArgumentParser() | |
parser.add_argument( | |
"-p", "--port", | |
action="store", | |
default=8080, | |
help="The HTTP port to listen to.", | |
) | |
parser.add_argument( | |
"-o", "--output", | |
action="store", | |
default="request_log.json", | |
help="The output file to record requests to.", | |
) | |
args = parser.parse_args() | |
main(port=int(args.port), output_file=args.output) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment