Last active
January 3, 2024 17:11
-
-
Save ag-michael/fad29c23dbb448bfa8f223902e824ba7 to your computer and use it in GitHub Desktop.
uploadToBloodhoundCE.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os,sys | |
import requests | |
import hmac | |
import hashlib | |
import datetime | |
import base64,time | |
import subprocess | |
BHE_TOKEN_ID = "<replace me>" | |
BHE_TOKEN_KEY = "<replace me>" | |
PATH = "/path/to/files" | |
def format_url(uri): | |
formatted_uri = uri | |
if uri.startswith("/"): | |
formatted_uri = formatted_uri[1:] | |
return f"http://127.0.0.1:8080/{formatted_uri}" | |
def _request(method, uri, body=b''): | |
global BHE_TOKEN_ID | |
global BHE_TOKEN_KEY | |
url=format_url(uri) | |
digester = None | |
digester = hmac.new(BHE_TOKEN_KEY.encode(), None, hashlib.sha256) | |
digester.update(f"{method}{uri}".encode()) | |
digester = hmac.new(digester.digest(), None, hashlib.sha256) | |
datetime_formatted = datetime.datetime.now().astimezone().isoformat("T") | |
digester.update(datetime_formatted[:13].encode()) | |
digester = hmac.new(digester.digest(), None, hashlib.sha256) | |
if body is not None: | |
digester.update(body) | |
# Perform the request with the signed and expected headers | |
headers={ | |
"User-Agent": "Agent User", | |
"Authorization": f"bhesignature {BHE_TOKEN_ID}", | |
"RequestDate": datetime_formatted, | |
"Signature": base64.b64encode(digester.digest()), | |
"Content-Type": "application/json", | |
} | |
return requests.request( | |
method=method, | |
url=url, | |
headers=headers, | |
data=body, | |
timeout=86400, | |
) | |
def getFile(fname): | |
with open(fname,"r",encoding="utf-8") as f: | |
return bytes(f.read(),"utf-8") | |
def uploadToBH(fname): | |
global PATH | |
URL = "http://localhost:8080" | |
os.chdir(PATH) | |
if fname.lower().endswith(".zip"): | |
subprocess.call(['unzip',fname]) | |
for item in os.listdir(PATH): | |
if item.endswith(".json"): | |
jsonFile = f"{PATH}{os.sep}{item}" | |
print(f"Uploading {jsonFile}") | |
sys.stdout.flush() | |
uploadResult = _request(method="POST",uri='/api/v2/file-upload/start',body=b'{}') | |
print(f"Status code:{uploadResult.status_code}") | |
print("Result:\n",uploadResult.text) | |
request_id = uploadResult.json()["data"]["id"] | |
data = getFile(jsonFile) | |
uploadFileResult = _request(method="POST",uri=f'/api/v2/file-upload/{request_id}',body=data) | |
print(f"Status code:{uploadFileResult.status_code}") | |
print("Result:\n",uploadFileResult.text) | |
print(f"\nFile sent: {jsonFile}") | |
uploadFileResult = _request(method="POST",uri=f'/api/v2/file-upload/{request_id}/end',body=b'') | |
print(f"Status code:{uploadFileResult.status_code}") | |
print("Result:\n",uploadFileResult.text) | |
print(f"\nFinished uploading: {jsonFile}") | |
os.system(f"mv -v {jsonFile} {jsonFile}.done") | |
if __name__ == "__main__": | |
for fname in os.listdir(PATH): | |
if fname.lower().endswith(".zip") or fname.lower().endswith(".json"): | |
print(f"Found:{PATH}{os.sep}{fname}") | |
uploadToBH(f"{PATH}{os.sep}{fname}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment