Skip to content

Instantly share code, notes, and snippets.

@alex-way
Created August 16, 2022 09:39
Show Gist options
  • Save alex-way/f5a545f25af0d99d51336651548c18ab to your computer and use it in GitHub Desktop.
Save alex-way/f5a545f25af0d99d51336651548c18ab to your computer and use it in GitHub Desktop.
AVA - Get Alarms JSON.py
""" AVA - Get Alarms
This script will retrieve all open, unsuppressed alarms from all customers
with an active AlienVault USM instance within the last week.
Parameters
---
swimlane_api_token: Key Store(swimlane_api_pat) = "secret_value"
The access token for the swimlane administrator account. Loaded from the key
store.
Returns
---
payload: str
status: str
"""
import json
from swimlane import Swimlane
from swimlane.core.search import EQ, NOT_EQ
import requests
from pydantic import BaseModel
from typing import List
import pendulum
class SWInputs(BaseModel):
swimlane_api_token: str
class SWOutput(BaseModel):
payload: str
status: str
class AlienVaultClient(BaseModel):
subdomain: str
client_id: str
client_secret: str
@property
def base_url(self):
return f"https://{self.subdomain}.alienvault.cloud"
@property
def access_token(self) -> str:
url = f"{self.base_url}/api/2.0/oauth/token"
params = {"grant_type": "client_credentials"}
response = requests.post(
url,
auth=(self.client_id, self.client_secret),
params=params,
timeout=120,
)
response.raise_for_status()
return response.json()["access_token"]
def get_open_alarms(self) -> List[dict]:
url = f"{self.base_url}/api/2.0/alarms"
headers = {"Authorization": f"Bearer {self.access_token}"}
week_ago_epoch = int(
pendulum.now().subtract(days=7).timestamp() * 1000
)
params = {
"status": "open",
"suppressed": "false",
"timestamp_occured_gte": week_ago_epoch,
"size": 200,
}
response = requests.get(
url, headers=headers, params=params, timeout=120
)
response.raise_for_status()
raw_alarms: List[dict] = (
response.json().get("_embedded", {}).get("alarms", [])
)
return raw_alarms
class CustomSwimlane(Swimlane):
def get_av_customers_av_clients(self) -> List[AlienVaultClient]:
ckb_app = self.apps.get(name="Customer Knowledge Base")
customers = ckb_app.records.search(
("AlienVault", EQ, "Yes"),
("AV Anywhere Subdomain", NOT_EQ, ""),
("AV Anywhere Client ID", NOT_EQ, ""),
("AV Anywhere Client Secret", NOT_EQ, ""),
limit=0,
)
return [
AlienVaultClient(
subdomain=customer["AV Anywhere Subdomain"],
client_id=customer["AV Anywhere Client ID"],
client_secret=customer["AV Anywhere Client Secret"],
)
for customer in customers
]
def main(context) -> List[SWOutput]:
inputs = SWInputs(**context.inputs)
swimlane = CustomSwimlane(
context.config["SwimlaneUrl"],
access_token=inputs.swimlane_api_token,
verify_ssl=False,
)
av_clients = swimlane.get_av_customers_av_clients()
sw_outputs: List[SWOutput] = []
for av_client in av_clients:
print("Processing", av_client.base_url)
try:
alarms = av_client.get_open_alarms()
for alarm in alarms:
alarm.update({"av_subdomain": av_client.subdomain})
sw_outputs.extend(
[
SWOutput(payload=json.dumps(a), status=a["status"])
for a in alarms
]
)
except Exception as e:
print(e)
return sw_outputs
if "sw_context" in globals():
sw_outputs = main(sw_context) # noqa: F821 # pragma: no cover
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment