Last active
July 29, 2022 17:29
-
-
Save superducktoes/b7b9796f13b5081554ea0caf6a4d0398 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from greynoise import GreyNoise | |
| import json | |
| import csv | |
| import argparse | |
| import os | |
| from stix2 import MemoryStore, Indicator | |
| api_client = GreyNoise(api_key="") | |
| parser = argparse.ArgumentParser() | |
| def generate_stix_output(name=None): | |
| results = api_client.query(args.query_to_run) | |
| mem = MemoryStore() | |
| for i in results["data"]: | |
| indicator = Indicator(name="{}".format(i["ip"]), | |
| pattern="[ipv4-addr:value='{}']".format(i["ip"]), | |
| pattern_type="stix", | |
| custom_properties={ | |
| "x_tags": i["tags"], | |
| "x_classification": i["classification"], | |
| "x_bot": i["bot"], | |
| "x_vpn": i["vpn"], | |
| "x_firstseen": i["first_seen"], | |
| "x_lastseen": i["last_seen"] | |
| }) | |
| mem.add(indicator) | |
| mem.save_to_file("./{}".format(args.output_file)) | |
| def write_to_json(name=None): | |
| # check to see if a name is supplied or if we are just generating a temp json file | |
| if(name == None): | |
| name = args.output_file | |
| results = api_client.query(args.query_to_run) | |
| with open(name, "w") as f: | |
| f.write(json.dumps(results, indent=4)) | |
| while("scroll" in results): | |
| results = api_client.query(args.query_to_run, scroll=results["scroll"]) | |
| with open(name, "a") as f: | |
| f.write(json.dumps(results, indent=4)) | |
| # flattens json data. takes a dict as the parameter | |
| def flatten_json(json_data): | |
| out = {} | |
| def flatten(x, name = ""): | |
| if type(x) is dict: | |
| for i in x: | |
| flatten(x[i], name + i + "_") | |
| elif type(x) is list: | |
| n = 0 | |
| for i in x: | |
| flatten(i, name + str(n) + "_") | |
| n += 1 | |
| else: | |
| out[name[:-1]] = x | |
| flatten(json_data) | |
| return out | |
| if __name__ == "__main__": | |
| parser.add_argument("-q", "--query", dest="query_to_run", default="last_seen:1d", help="Query to run", required=True) | |
| parser.add_argument("-o", "--output", dest="output_file", default="results.json", help="File to write to", required=True) | |
| parser.add_argument("-f", "--format", dest="format", default="json", help="json | csv | stix") | |
| args = parser.parse_args() | |
| # json output uses the default data returned from the GN API | |
| if args.format == "json": | |
| write_to_json() | |
| elif args.format == "csv": | |
| # since this can be a large query write everything to a file first | |
| write_to_json("temp_results.json") | |
| csv_list = [] | |
| f = open("temp_results.json") | |
| results = json.load(f) | |
| results = results["data"] # get to just the data to write to the csv | |
| # flatten the json data into a new list | |
| for i in results: | |
| flattened_results = flatten_json(i) | |
| csv_list.append(flattened_results) | |
| # find which item in the list has the most elements so we use that as the header for the csv | |
| max_length = 0 | |
| cur_header = "" | |
| for i in csv_list: | |
| cur_len = len(i.keys()) | |
| if cur_len > max_length: | |
| max_length = cur_len | |
| cur_header = i.keys() | |
| # write the header to the csv | |
| csv_file = open(args.output_file, "w") | |
| csv_writer = csv.writer(csv_file) | |
| csv_writer.writerow(cur_header) | |
| # write the values of each item in the list | |
| for i in csv_list: | |
| csv_writer.writerow(i.values()) | |
| csv_file.close() | |
| # remove the json file since the csv file has been generated and saved | |
| os.remove("temp_results.json") | |
| elif args.format == "stix": | |
| generate_stix_output() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment