Last active
August 7, 2024 13:58
-
-
Save lopes/51ff4951c988c5d63f96384c7aef43fe to your computer and use it in GitHub Desktop.
Cleans up lines in the reference lists in Chronicle SIEM with expired dates. #chronicle #gcp #functions #python #siem #list #management
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Cleans up lines in the reference lists in Chronicle SIEM with expired dates. | |
This script can be used to clean up some or all Reference Lists in Chronicle SIEM. | |
It scans each list and removes expired lines with the same pattern it is set to | |
monitor. Lines outside this pattern are just ignored. The pattern is: | |
<value> // expires:YYYY-MM-DD | |
DEPLOYMENT | |
It is intended to be used as a Cloud Function in the GCP platform. Create a new | |
function with the following parameters: | |
1. Environment: 2nd gen | |
2. Function name: chronicle-list-cleaner | |
3. Region: us-central1 (or another of your preference) | |
4. Trigger type: HTTPS (will setup Cloud Scheduler later but copy the URL now) | |
5. Set the runtime environment with the following minimum requirements: | |
5.1. Memory: 256 MiB | |
5.2. CPU: 1 | |
5.3. Timeout: 300 seconds | |
5.4. Add the `CHRONICLE_KEY` environment variable with the JSON access key | |
6. Click Next | |
In the code section, set Python runtime (tested in 3.12) and using the inline | |
editor, set the entry point to `main`, paste the code below into the `main.py` | |
file. In `requirements.txt` paste the following: | |
``` | |
functions-framework==3.* | |
google-auth | |
requests | |
``` | |
I recommend running some tests against specific lists before running it against | |
all lists! | |
Still in GCP, go to Cloud Scheduler and create a new job with the following | |
parameters: | |
1. Name: chronicle-list-cleaner | |
2. Region: According to your environment | |
3. Description: Runs chronicle-list-cleaner daily to remove expired lines | |
4. Frequency: Select the desired frequency, like `10 5 * * *` for daily at 0510 | |
5. Timezone: According to your preference | |
6. Click to proceed and select HTTP for the target type | |
7. URL: Paste the URL from the Cloud Function created before | |
8. HTTP method: POST | |
9. Add an HTTP header to identify the request | |
9.1. Content-Type: application/json | |
9.2. User-Agent: Google Cloud Scheduler | |
10. Add the following body: | |
{ | |
"command": "cleanup", | |
"list_name": "ALL__LISTS", | |
"list_type": "" | |
} | |
11. Configure the authentication method according to your environment | |
12. Click to proceed and finish the job creation | |
If everything's correct, the job will run daily at the specified time and clean | |
up the lists with lines set to expire. | |
NOTE ON LINE ORDER | |
The `set()` function is used to remove duplicates, so the order of the lines in | |
the list will be changed! If you want to keep the order, remove the `set()` and | |
use a different approach to remove duplicates. | |
NOTE ON EXPIRATION | |
The date is truncated to 0h each day, so it will remove items that expired on | |
the same day it is running. If you want to keep the items until the end of the | |
day inform the date as the next day because it'll expire at 0h of the next day. | |
BASED ON | |
- https://github.com/chronicle/api-samples-python | |
- https://github.com/goog-cmmartin/thatsiemguy/blob/main/mati/cloud_function/main.py | |
AUTHOR: Joe Lopes <lopes.id> | |
DATE: 2023-07-18 | |
LICENSE: MIT | |
''' | |
from requests.exceptions import HTTPError | |
from google.auth.transport import requests | |
from google.oauth2 import service_account | |
from json import loads | |
from re import compile | |
from datetime import datetime | |
import functions_framework # cloud functions/flask requirement | |
from os import environ | |
AUTHORIZATION_SCOPES = [ | |
'https://www.googleapis.com/auth/chronicle-backstory', | |
'https://www.googleapis.com/auth/malachite-ingestion' | |
] | |
CHRONICLE_API_BASE_URL = "https://backstory.googleapis.com" | |
re_expiration = compile(r'^[\w\:\-\?\!\@].*//\s*expires:\s*(?P<expiration>\d{4}-\d{2}-\d{2})') | |
def init_client(credentials, scopes=AUTHORIZATION_SCOPES): | |
''' | |
Obtains an authorized session using the provided credentials. | |
Args: | |
credentials (google.oauth2.service_account.Credentials): The service account credentials | |
Returns: | |
requests.AuthorizedSession: An authorized session for making API calls. | |
''' | |
credentials = service_account.Credentials.from_service_account_info( | |
credentials, | |
scopes=AUTHORIZATION_SCOPES | |
) | |
return requests.AuthorizedSession(credentials) | |
def get_lists(client, params): | |
''' | |
Obtain all Reference Lists in Chronicle SIEM. | |
Args: | |
client: Session for making API calls | |
Returns: | |
List of strings of all reference lists' names and types. | |
''' | |
url = f'{CHRONICLE_API_BASE_URL}/v2/lists' | |
res = client.request('GET', url, params=params) | |
res.raise_for_status() | |
lists = list() | |
for x in res.json()['lists']: | |
try: | |
lists.append((x['name'], x['contentType'])) | |
except KeyError: | |
lists.append((x['name'], 'CONTENT_TYPE_DEFAULT_STRING')) | |
return lists | |
def get_lines(client, list_name): | |
''' | |
Fetches all lines in a reference list. | |
Args: | |
client: Session for making API calls. | |
list_name: Name of the reference list. | |
Returns: | |
Full response from Chronicle API in JSON format. | |
''' | |
url = f'{CHRONICLE_API_BASE_URL}/v2/lists/{list_name}' | |
res = client.request('GET', url) | |
res.raise_for_status() | |
return res.json() | |
def cleanup_list(client, list_name, list_type): | |
''' | |
Removes expired lines from a list. | |
Args: | |
client: Session for making API calls | |
list_name: Name of the reference list | |
list_type: Type of the reference list | |
Returns: | |
Updates the list if any expired item and returns the number of deprecated lines. | |
''' | |
ref_list = get_lines(client, list_name) | |
dedup_lines = set(ref_list['lines']) | |
out_lines = list() | |
today = datetime.today() | |
expired = len(ref_list['lines']) - len(dedup_lines) | |
for line in dedup_lines: | |
if line: # skipping blank lines | |
try: | |
# will ONLY work over lines in the format defined here | |
p = re_expiration.search(line) | |
if datetime.strptime(p.group('expiration'),'%Y-%m-%d') < today: | |
out_lines.append(f'// {line}') # just comment, don't delete | |
expired += 1 | |
else: | |
out_lines.append(line) | |
except AttributeError: | |
out_lines.append(line) | |
# TODO: update the description with the timestamp + number of expired lines | |
# ex.: <timestamp>: +X lines, -Y lines | |
url = f'{CHRONICLE_API_BASE_URL}/v2/lists' | |
body = { | |
'name': list_name, | |
'lines': sorted(out_lines), | |
'content_type': list_type | |
} | |
params = {'update_mask': 'list.lines'} | |
res = client.request('PATCH', url, params=params, json=body) | |
res.raise_for_status() | |
print(f'Cleaned {list_name} ({list_type}): {expired} items expired') | |
return expired | |
def cleanup_all_lists(client): | |
''' | |
Removes expired lines from all lists in Chronicle SIEM. | |
Args: | |
client: Session for making API calls | |
Returns: | |
Updates the lists and returns a list of tuples like (list_name,num_expired). | |
''' | |
lists = get_lists(client, {'page_size':2000,'page_token':''}) | |
status = list() | |
for lname,ltype in lists: | |
try: | |
expired = cleanup_list(client, lname, ltype) | |
status.append((lname, expired)) | |
except HTTPError: | |
print(f'Error processing: {lname} ({ltype})') | |
status.append((lname, -1)) | |
return status | |
@functions_framework.http | |
def main(request): | |
''' | |
Entry point for Cloud Functions. | |
Args: | |
request: The caller's request parameters (args for URL params and json for data) | |
Returns: | |
Updates the list if any expired item and returns the number of deprecated lines. | |
''' | |
req_json = request.get_json(silent=True) | |
req_args = request.args | |
credentials = loads(environ.get('CHRONICLE_KEY')) | |
client = init_client(credentials) | |
if req_json['command'] == 'cleanup': | |
print(f'Starting cleanup for {req_json['list_name']}') | |
if req_json['list_name'] == 'ALL__LISTS': | |
status = cleanup_all_lists(client) | |
else: | |
status = cleanup_list(client, req_json['list_name'], req_json['list_type']) | |
else: | |
raise ValueError('Invalid command') | |
print(f'Finished: {len(status)} lists processed') | |
return f'{status}' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment