Skip to content

Instantly share code, notes, and snippets.

@superducktoes
Last active July 29, 2022 17:29
Show Gist options
  • Select an option

  • Save superducktoes/b7b9796f13b5081554ea0caf6a4d0398 to your computer and use it in GitHub Desktop.

Select an option

Save superducktoes/b7b9796f13b5081554ea0caf6a4d0398 to your computer and use it in GitHub Desktop.
from greynoise import GreyNoise
import json
import csv
import argparse
import os
from stix2 import MemoryStore, Indicator
api_client = GreyNoise(api_key="")
parser = argparse.ArgumentParser()
def generate_stix_output(name=None):
results = api_client.query(args.query_to_run)
mem = MemoryStore()
for i in results["data"]:
indicator = Indicator(name="{}".format(i["ip"]),
pattern="[ipv4-addr:value='{}']".format(i["ip"]),
pattern_type="stix",
custom_properties={
"x_tags": i["tags"],
"x_classification": i["classification"],
"x_bot": i["bot"],
"x_vpn": i["vpn"],
"x_firstseen": i["first_seen"],
"x_lastseen": i["last_seen"]
})
mem.add(indicator)
mem.save_to_file("./{}".format(args.output_file))
def write_to_json(name=None):
# check to see if a name is supplied or if we are just generating a temp json file
if(name == None):
name = args.output_file
results = api_client.query(args.query_to_run)
with open(name, "w") as f:
f.write(json.dumps(results, indent=4))
while("scroll" in results):
results = api_client.query(args.query_to_run, scroll=results["scroll"])
with open(name, "a") as f:
f.write(json.dumps(results, indent=4))
# flattens json data. takes a dict as the parameter
def flatten_json(json_data):
out = {}
def flatten(x, name = ""):
if type(x) is dict:
for i in x:
flatten(x[i], name + i + "_")
elif type(x) is list:
n = 0
for i in x:
flatten(i, name + str(n) + "_")
n += 1
else:
out[name[:-1]] = x
flatten(json_data)
return out
if __name__ == "__main__":
parser.add_argument("-q", "--query", dest="query_to_run", default="last_seen:1d", help="Query to run", required=True)
parser.add_argument("-o", "--output", dest="output_file", default="results.json", help="File to write to", required=True)
parser.add_argument("-f", "--format", dest="format", default="json", help="json | csv | stix")
args = parser.parse_args()
# json output uses the default data returned from the GN API
if args.format == "json":
write_to_json()
elif args.format == "csv":
# since this can be a large query write everything to a file first
write_to_json("temp_results.json")
csv_list = []
f = open("temp_results.json")
results = json.load(f)
results = results["data"] # get to just the data to write to the csv
# flatten the json data into a new list
for i in results:
flattened_results = flatten_json(i)
csv_list.append(flattened_results)
# find which item in the list has the most elements so we use that as the header for the csv
max_length = 0
cur_header = ""
for i in csv_list:
cur_len = len(i.keys())
if cur_len > max_length:
max_length = cur_len
cur_header = i.keys()
# write the header to the csv
csv_file = open(args.output_file, "w")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(cur_header)
# write the values of each item in the list
for i in csv_list:
csv_writer.writerow(i.values())
csv_file.close()
# remove the json file since the csv file has been generated and saved
os.remove("temp_results.json")
elif args.format == "stix":
generate_stix_output()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment