Last active
September 2, 2023 17:29
-
-
Save Mikaayenson/9efff700e5d799c672c6b17338d2de6a to your computer and use it in GitHub Desktop.
Sample use case to demonstrate applying the new chatGPT model to security summaries.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import openai | |
import requests | |
import textwrap | |
import uuid | |
# pip3 install openai requests | |
# setup the API credentials | |
es_username = "<your username>" | |
es_password = "<your password>" | |
es_url = "https://localhost:9200" | |
openai_key = "<your openai api key>" | |
openai.api_key = openai_key | |
def extract_fields(nested_dict, fields): | |
new_dict = {} | |
for key, value in nested_dict.items(): | |
if key in fields: | |
new_dict[key] = value | |
elif isinstance(value, dict): | |
extracted = extract_fields(value, fields) | |
if extracted: | |
new_dict.update(extracted) | |
return new_dict | |
def wrap_and_indent(text, width, indent): | |
wrapped_text = textwrap.wrap(text, width=width) | |
indented_text = [f"{' ' * indent}{line}" for line in wrapped_text] | |
return '\n'.join(indented_text) | |
# Define the DSL query for alerts | |
body = { | |
"query": { | |
"match": { | |
"event.action": "rule_detection" | |
} | |
} | |
} | |
# Send the POST request to the API endpoint | |
headers = {"Content-Type": "application/json", "kbn-xsrf": str(uuid.uuid4())} | |
response = requests.post( | |
url=f"{es_url}/api/detection_engine/signals/search", | |
json=body, | |
headers=headers, | |
auth=(es_username, es_password) | |
) | |
# Retrieve the alerts from the response | |
alerts = response.json().get("hits", {}).get("hits", []) | |
# Define the fields to extract from the alert JSON | |
fields = [ | |
"event.kind", | |
"signal.rule.severity", | |
"kibana.alert.rule.name", | |
"signal.reason", | |
"signal.rule.type", | |
"signal.rule.interval", | |
"signal.rule.risk_score", | |
"kibana.alert.rule.producer", | |
"kibana.alert.rule.description" | |
] | |
# Define the prompts to use for the OpenAI model | |
prompts = [ | |
"Summarize the event:", | |
"Explain this event like I'm five:", | |
"Explain this event to my kids:", | |
"Explain this event to my boss:", | |
"Explain this event to the new graduate:", | |
"Explain what happened in this event:", | |
"Explain this event to the CISO:", | |
"What are the next investigative steps to take based on this event?" | |
] | |
for alert in alerts: | |
# Extract relevant data from the alert JSON | |
event = extract_fields(alert["_source"], fields) | |
print(f"Summarizing: {event.get('kibana.alert.rule.name')}") | |
# Summarize the alert using the OpenAI GPT-3 API | |
for prompt in prompts: | |
completion = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": f"{prompt}\n{event}"}] | |
) | |
# Print the summary | |
print(f"\n\t{prompt}\n") | |
# summary = completion['choices'][0]['text'] # recently changed | |
summary = completion['choices'][0]['message']["content"] | |
text = wrap_and_indent(summary, width=80, indent=8) | |
print(text) | |
# break | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@Mikaayenson is there any documentation on how to implement this? I've edited the script to match my env and it runs no problem.. but where would the output be?