Created
August 16, 2022 09:39
-
-
Save alex-way/f5a545f25af0d99d51336651548c18ab to your computer and use it in GitHub Desktop.
AVA - Get Alarms JSON.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" AVA - Get Alarms | |
This script will retrieve all open, unsuppressed alarms from all customers | |
with an active AlienVault USM instance within the last week. | |
Parameters | |
--- | |
swimlane_api_token: Key Store(swimlane_api_pat) = "secret_value" | |
The access token for the swimlane administrator account. Loaded from the key | |
store. | |
Returns | |
--- | |
payload: str | |
status: str | |
""" | |
import json | |
from swimlane import Swimlane | |
from swimlane.core.search import EQ, NOT_EQ | |
import requests | |
from pydantic import BaseModel | |
from typing import List | |
import pendulum | |
class SWInputs(BaseModel): | |
swimlane_api_token: str | |
class SWOutput(BaseModel): | |
payload: str | |
status: str | |
class AlienVaultClient(BaseModel): | |
subdomain: str | |
client_id: str | |
client_secret: str | |
@property | |
def base_url(self): | |
return f"https://{self.subdomain}.alienvault.cloud" | |
@property | |
def access_token(self) -> str: | |
url = f"{self.base_url}/api/2.0/oauth/token" | |
params = {"grant_type": "client_credentials"} | |
response = requests.post( | |
url, | |
auth=(self.client_id, self.client_secret), | |
params=params, | |
timeout=120, | |
) | |
response.raise_for_status() | |
return response.json()["access_token"] | |
def get_open_alarms(self) -> List[dict]: | |
url = f"{self.base_url}/api/2.0/alarms" | |
headers = {"Authorization": f"Bearer {self.access_token}"} | |
week_ago_epoch = int( | |
pendulum.now().subtract(days=7).timestamp() * 1000 | |
) | |
params = { | |
"status": "open", | |
"suppressed": "false", | |
"timestamp_occured_gte": week_ago_epoch, | |
"size": 200, | |
} | |
response = requests.get( | |
url, headers=headers, params=params, timeout=120 | |
) | |
response.raise_for_status() | |
raw_alarms: List[dict] = ( | |
response.json().get("_embedded", {}).get("alarms", []) | |
) | |
return raw_alarms | |
class CustomSwimlane(Swimlane): | |
def get_av_customers_av_clients(self) -> List[AlienVaultClient]: | |
ckb_app = self.apps.get(name="Customer Knowledge Base") | |
customers = ckb_app.records.search( | |
("AlienVault", EQ, "Yes"), | |
("AV Anywhere Subdomain", NOT_EQ, ""), | |
("AV Anywhere Client ID", NOT_EQ, ""), | |
("AV Anywhere Client Secret", NOT_EQ, ""), | |
limit=0, | |
) | |
return [ | |
AlienVaultClient( | |
subdomain=customer["AV Anywhere Subdomain"], | |
client_id=customer["AV Anywhere Client ID"], | |
client_secret=customer["AV Anywhere Client Secret"], | |
) | |
for customer in customers | |
] | |
def main(context) -> List[SWOutput]: | |
inputs = SWInputs(**context.inputs) | |
swimlane = CustomSwimlane( | |
context.config["SwimlaneUrl"], | |
access_token=inputs.swimlane_api_token, | |
verify_ssl=False, | |
) | |
av_clients = swimlane.get_av_customers_av_clients() | |
sw_outputs: List[SWOutput] = [] | |
for av_client in av_clients: | |
print("Processing", av_client.base_url) | |
try: | |
alarms = av_client.get_open_alarms() | |
for alarm in alarms: | |
alarm.update({"av_subdomain": av_client.subdomain}) | |
sw_outputs.extend( | |
[ | |
SWOutput(payload=json.dumps(a), status=a["status"]) | |
for a in alarms | |
] | |
) | |
except Exception as e: | |
print(e) | |
return sw_outputs | |
if "sw_context" in globals(): | |
sw_outputs = main(sw_context) # noqa: F821 # pragma: no cover |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment