Created
September 23, 2025 21:44
-
-
Save skuenzli/368e4bf3e03a534cf08a4e651934f75e to your computer and use it in GitHub Desktop.
Export Findings from Security Hub in OCSF format
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Get findings from Security Hub in OCSF format. | |
Example usage: | |
python3 export_ocsf_findings.py --account 123456789012 --status New --severity Fatal --severity Critical --severity High | |
python3 export_ocsf_findings.py --days-ago 30 --severity Critical | |
""" | |
import argparse | |
import json | |
import sys | |
import boto3 | |
from botocore.exceptions import ClientError, BotoCoreError | |
def build_filters(args) -> dict: | |
"""Build the filters dict for the get_findings_v2 API based on command line arguments.""" | |
filters = {} | |
composite_filters = [] | |
# Account filter | |
if args.account: | |
composite_filters.append({ | |
"StringFilters": [ | |
{ | |
"FieldName": "cloud.account.uid", | |
"Filter": { | |
"Value": args.account, | |
"Comparison": "EQUALS" | |
} | |
} | |
] | |
}) | |
# Status filter | |
if args.status: | |
composite_filters.append({ | |
"StringFilters": [ | |
{ | |
"FieldName": "status", | |
"Filter": { | |
"Value": args.status, | |
"Comparison": "EQUALS" | |
} | |
} | |
] | |
}) | |
# Severity filter (multiple values with OR) | |
if args.severity: | |
severity_filters = [] | |
for severity in args.severity: | |
severity_filters.append({ | |
"FieldName": "severity", | |
"Filter": { | |
"Value": severity, | |
"Comparison": "EQUALS" | |
} | |
}) | |
if len(severity_filters) > 1: | |
composite_filters.append({ | |
"Operator": "OR", | |
"StringFilters": severity_filters | |
}) | |
else: | |
composite_filters.append({ | |
"StringFilters": severity_filters | |
}) | |
# Date filter (days ago) | |
if args.days_ago: | |
composite_filters.append({ | |
"DateFilters": [ | |
{ | |
"FieldName": "finding_info.created_time_dt", | |
"Filter": { | |
"DateRange": { | |
"Unit": "DAYS", | |
"Value": args.days_ago | |
} | |
} | |
} | |
] | |
}) | |
# Build final filters structure | |
if composite_filters: | |
if len(composite_filters) == 1: | |
filters["CompositeFilters"] = composite_filters | |
else: | |
# Multiple filters need AND operator | |
filters["CompositeOperator"] = "AND" | |
filters["CompositeFilters"] = composite_filters | |
return filters | |
def get_ocsf_findings(filters: dict) -> list[dict]: | |
"""Retrieve all findings from Security Hub using pagination. | |
:returns a list of OCSF finding object dictionaries | |
""" | |
try: | |
client = boto3.client('securityhub') | |
paginator = client.get_paginator('get_findings_v2') | |
pagination_kwargs = { | |
'PaginationConfig': { | |
'MaxItems': 10000, # Maximum findings to return in total | |
'PageSize': 100, # Items (findings) per page | |
} | |
} | |
if filters: | |
pagination_kwargs['Filters'] = filters | |
page_iterator = paginator.paginate(**pagination_kwargs) | |
# Collect all findings | |
all_findings = [] | |
for page in page_iterator: | |
if 'Findings' in page: | |
all_findings.extend(page['Findings']) | |
return all_findings | |
except ClientError as e: | |
error_code = e.response['Error']['Code'] | |
error_message = e.response['Error']['Message'] | |
print(f"Error: AWS API error - {error_code}: {error_message}", file=sys.stderr) | |
sys.exit(-1) | |
except BotoCoreError as e: | |
print(f"Error: AWS SDK error - {str(e)}", file=sys.stderr) | |
sys.exit(-1) | |
except Exception as e: | |
print(f"Error: Unexpected error - {str(e)}", file=sys.stderr) | |
sys.exit(-1) | |
def main(): | |
"""Main entry point for the script.""" | |
# Set up argument parser | |
parser = argparse.ArgumentParser( | |
description='Exports findings from Security Hub in OCSF format. Exports all findings by default.' | |
' Filter findings with the program options.' | |
) | |
parser.add_argument( | |
'--account', | |
type=str, | |
help='Filter by AWS account ID' | |
) | |
parser.add_argument( | |
'--status', | |
type=str, | |
help='Filter by finding status (e.g., New, Suppressed, Resolved)' | |
) | |
parser.add_argument( | |
'--severity', | |
type=str, | |
action='append', | |
help='Filter by severity (can specify multiple times for OR logic)' | |
) | |
parser.add_argument( | |
'--days-ago', | |
type=int, | |
help='Filter findings created within the last N days, e.g. 30' | |
) | |
# Parse arguments | |
args = parser.parse_args() | |
# Build filters | |
filters = build_filters(args) | |
# Get findings | |
findings = get_ocsf_findings(filters) | |
# Output findings as JSON | |
print(json.dumps(findings, indent=2)) | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment