Last active
July 5, 2021 18:06
-
-
Save rc-abodkins/3afe5062c46100717a84039b5546d285 to your computer and use it in GitHub Desktop.
This script searches binaries within VMware Carbon Black EDR
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import os | |
import configparser | |
import csv | |
import sys | |
from os.path import exists | |
import requests | |
#Console Output coloring. Makes knowing if you have any errors/ warnings easier to identify | |
err_Col = '\033[91m' | |
success_Col = '\033[92m' | |
warn_Col = '\033[93m' | |
no_col = '\033[0m' | |
birb=""" | |
_.----._ | |
,'.::.--..:._ | |
/::/_,-<o)::;_`-._ | |
::::::::`-';'`,--`-` | |
;::;'|::::,',' | |
,'::/ ;:::/, :. | |
/,':/ /::;' \ ':\\ | |
:'.:: ,-'' . `.::\\ | |
\.:;':. ` :: .: | |
(;' ;;; .::' :| | |
\,:;; \ `::.\.\\ | |
`);' '::' `: | |
\. ` `' .: _,' | |
`.: .. -. ' :. :/ _.-' _.- | |
>;._.:._.;,-=_(.-' __ `._ | |
,;' _..-(((('' .,-'' `-._ | |
_,'<.-'' _..``'.'`-'`. ` | |
_.-((((_..--'' \ \ `.`. | |
-' _.``' \ ` | |
-------------------------------------------------- | |
""" | |
binary_hunt_ascii=""" | |
____ _ _ _ _ _ _ _ _ | |
| __ )(_)_ __ __ _ _ __ _ _ / \ _ __ __ _| |_ _ ___(_)___ | | | |_ _ _ __ | |_(_)_ __ __ _ | |
| _ \| | '_ \ / _` | '__| | | | / _ \ | '_ \ / _` | | | | / __| / __| | |_| | | | | '_ \| __| | '_ \ / _` | | |
| |_) | | | | | (_| | | | |_| | / ___ \| | | | (_| | | |_| \__ \ \__ \ | _ | |_| | | | | |_| | | | | (_| | | |
|____/|_|_| |_|\__,_|_| \__, | /_/ \_\_| |_|\__,_|_|\__, |___/_|___/ |_| |_|\__,_|_| |_|\__|_|_| |_|\__, | | |
|___/ |___/ |___/ | |
""" | |
def credential_file(): | |
credentials_response_locations = [ | |
"/etc/carbonblack/credentials.response", | |
f"{os.path.dirname(__file__)}/.carbonblack/credentials.response", | |
f"{os.getcwd()}/.carbonblack/credentials.response" | |
] | |
for p in credentials_response_locations: | |
location = "" | |
if exists(path=p): | |
location = p | |
else: | |
pass | |
return location | |
def config_reader(file_location): | |
config = configparser.ConfigParser() | |
config.sections() | |
config.read(file_location) | |
return config | |
def get_binaries(base_url, auth_token, query="*", rows="100000"): | |
binary_url = f"{base_url}/api/v1/binary?q={query}&rows={rows}" | |
headers = { | |
"X-Auth-Token":f"{auth_token}" | |
} | |
binaries = requests.get(binary_url, headers=headers) | |
if binaries.status_code == 200: | |
return binaries | |
else: | |
sys.exit(err_Col + f"[!][!] We encountered a problem with the query to Carbon Black. The status code returned was {binaries.status_code}") | |
return None | |
def parse_results(results, base_url, subdomain): | |
results_dict = [] | |
for binary in results.json()["results"]: | |
try: | |
deets = dict() | |
dig_sig = '' | |
if "digsig_publisher" in binary: | |
dig_sig = binary['digsig_publisher'] | |
observed_filename_list = '' | |
for observed in binary['observed_filename']: | |
observed_filename_list += observed.replace("\\\\", "\\") + "\n" | |
deets = { | |
"shortname":subdomain, | |
"md5":binary["md5"], | |
"signature_status":binary["signed"], | |
"company_name":binary["company_name"], | |
"observed_filename":observed_filename_list, | |
"count_observed_filename":int(len(binary["observed_filename"])), | |
"original_filename":binary["original_filename"], | |
"internal_name":binary["internal_name"], | |
"file_desc":binary["file_desc"], | |
"server_added_timestamp":binary["server_added_timestamp"], | |
"digsig_publisher":dig_sig, | |
"os_type":binary["os_type"], | |
"host_count":binary["host_count"], | |
"is_executable_image":binary["is_executable_image"], | |
"url":f'{base_url}/#/binary/{binary["md5"]}' | |
} | |
results_dict.append(deets) | |
except Exception as e: | |
sys.exit(err_Col + f"[!] We encountered an issue trying to parse out your results. This is the exception {e}." + no_col) | |
return results_dict | |
def write_csv(output, results): | |
header = ['shortname', 'md5', 'signature_status', 'company_name', 'observed_filename', 'count_observed_filename', 'original_filename', 'internal_name', 'file_desc', 'server_added_timestamp', 'digsig_publisher', 'os_type', 'host_count', 'is_executable_image', 'url'] | |
with open(output, 'w', encoding='UTF8', newline='') as f: | |
writer = csv.DictWriter(f, fieldnames=header) | |
writer.writeheader() | |
writer.writerows(results) | |
def main(): | |
parser = argparse.ArgumentParser(description="Script to hunt through binaries of a single Carbon Black Response customer.") | |
parser.add_argument("--subdomain", "--d", type=str, required=False, help="Subdomain you would like to hunt through") | |
parser.add_argument("--config", "--c", type=str, required=False, help="location of .ini file with the credentials required.") | |
parser.add_argument("--output", "--o", required=False, help="Where do you want to store the CSV file of results?") | |
parser.add_argument("--query", "--q", required=False, help="Specify a specific query you'd like to search. By default it'll return everything.") | |
parser.add_argument("--rows", "--r", required=False, help="Specify the number of rows you'd like outputted. By default it'll return up to 100,000.") | |
parser.add_argument("--no-color", action='store_true') | |
parser.add_argument("--api", required=False, help="API Key for your Carbon Black Instance") | |
parser.add_argument("--url", required=False, help="URL for your Carbon Black Response server") | |
args = parser.parse_args() | |
if args.no_color == True: | |
err_Col = no_col | |
success_Col = no_col | |
warn_Col = no_col | |
else: | |
err_Col = '\033[91m' | |
success_Col = '\033[92m' | |
warn_Col = '\033[93m' | |
#display ascii art | |
print(err_Col + birb) | |
print(success_Col + binary_hunt_ascii + no_col) | |
cred_file = "" | |
api = "" | |
url = "" | |
if args.config: | |
cred_file = args.config | |
elif credential_file() != "": | |
cred_file = credential_file() | |
else: | |
print(warn_Col + "[!] We couldn't find a credential .ini file in any standard location.") | |
if args.api: | |
api = args.api | |
else: | |
api = input(warn_Col + "Please enter a valid API key: " + no_col) | |
if args.url: | |
url = args.url | |
else: | |
url = input(warn_Col + "Please enter a valid URL for your CB server (no trailing /): " + no_col) | |
#checks to see if a subdomain was provided | |
if args.subdomain and cred_file: | |
subdomain = args.subdomain | |
elif cred_file: | |
subdomain = input(warn_Col + "[!] No subdomain provided. Please enter a subdomain that is included in your credential file: " + no_col) | |
else: | |
subdomain = "" | |
#check if output location was given | |
if args.output: | |
output = args.output | |
else: | |
output = os.getcwd() + "/binary_analysis.csv" | |
print(warn_Col + "[*] No output file provided. The output file will be in the current working directory and named binary_analysis.csv" + no_col) | |
if api and url: | |
base_url = url.rstrip() | |
auth_token = api.rstrip() | |
else: | |
#check the config file to make sure it has the appropriate subdomain and format | |
config = config_reader(cred_file) | |
try: | |
base_url = config[subdomain]['url'] | |
auth_token = config[subdomain]['token'] | |
except Exception as e: | |
sys.exit(err_Col + "[!][!] The subodmain you provided is not in the credentials.response file. Please add it.") | |
if args.query and args.rows: | |
binaries = get_binaries(base_url, auth_token, query=args.query, rows=args.rows) | |
elif args.query: | |
binaries = get_binaries(base_url, auth_token, query=args.query) | |
elif args.rows: | |
binaries = get_binaries(base_url, auth_token, rows=args.rows) | |
else: | |
print(success_Col + "[*] Getting list of binaries from Carbon Black") | |
binaries = get_binaries(base_url, auth_token) | |
print(success_Col + f"[*] There were a total of {int(len(binaries.json()['results']))} results returned") | |
print(success_Col + f"[*] Parsing the results to {output}") | |
if binaries: | |
parsed_results = parse_results(binaries, base_url, subdomain) | |
write_csv(output, parsed_results) | |
else: | |
sys.exit(err_Col + "We've encountered a problem.") | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment