Skip to content

Instantly share code, notes, and snippets.

@skuenzli
Created September 23, 2025 21:44
Show Gist options
  • Save skuenzli/368e4bf3e03a534cf08a4e651934f75e to your computer and use it in GitHub Desktop.
Save skuenzli/368e4bf3e03a534cf08a4e651934f75e to your computer and use it in GitHub Desktop.
Export Findings from Security Hub in OCSF format
#!/usr/bin/env python3
"""
Get findings from Security Hub in OCSF format.
Example usage:
python3 export_ocsf_findings.py --account 123456789012 --status New --severity Fatal --severity Critical --severity High
python3 export_ocsf_findings.py --days-ago 30 --severity Critical
"""
import argparse
import json
import sys
import boto3
from botocore.exceptions import ClientError, BotoCoreError
def build_filters(args) -> dict:
"""Build the filters dict for the get_findings_v2 API based on command line arguments."""
filters = {}
composite_filters = []
# Account filter
if args.account:
composite_filters.append({
"StringFilters": [
{
"FieldName": "cloud.account.uid",
"Filter": {
"Value": args.account,
"Comparison": "EQUALS"
}
}
]
})
# Status filter
if args.status:
composite_filters.append({
"StringFilters": [
{
"FieldName": "status",
"Filter": {
"Value": args.status,
"Comparison": "EQUALS"
}
}
]
})
# Severity filter (multiple values with OR)
if args.severity:
severity_filters = []
for severity in args.severity:
severity_filters.append({
"FieldName": "severity",
"Filter": {
"Value": severity,
"Comparison": "EQUALS"
}
})
if len(severity_filters) > 1:
composite_filters.append({
"Operator": "OR",
"StringFilters": severity_filters
})
else:
composite_filters.append({
"StringFilters": severity_filters
})
# Date filter (days ago)
if args.days_ago:
composite_filters.append({
"DateFilters": [
{
"FieldName": "finding_info.created_time_dt",
"Filter": {
"DateRange": {
"Unit": "DAYS",
"Value": args.days_ago
}
}
}
]
})
# Build final filters structure
if composite_filters:
if len(composite_filters) == 1:
filters["CompositeFilters"] = composite_filters
else:
# Multiple filters need AND operator
filters["CompositeOperator"] = "AND"
filters["CompositeFilters"] = composite_filters
return filters
def get_ocsf_findings(filters: dict) -> list[dict]:
"""Retrieve all findings from Security Hub using pagination.
:returns a list of OCSF finding object dictionaries
"""
try:
client = boto3.client('securityhub')
paginator = client.get_paginator('get_findings_v2')
pagination_kwargs = {
'PaginationConfig': {
'MaxItems': 10000, # Maximum findings to return in total
'PageSize': 100, # Items (findings) per page
}
}
if filters:
pagination_kwargs['Filters'] = filters
page_iterator = paginator.paginate(**pagination_kwargs)
# Collect all findings
all_findings = []
for page in page_iterator:
if 'Findings' in page:
all_findings.extend(page['Findings'])
return all_findings
except ClientError as e:
error_code = e.response['Error']['Code']
error_message = e.response['Error']['Message']
print(f"Error: AWS API error - {error_code}: {error_message}", file=sys.stderr)
sys.exit(-1)
except BotoCoreError as e:
print(f"Error: AWS SDK error - {str(e)}", file=sys.stderr)
sys.exit(-1)
except Exception as e:
print(f"Error: Unexpected error - {str(e)}", file=sys.stderr)
sys.exit(-1)
def main():
"""Main entry point for the script."""
# Set up argument parser
parser = argparse.ArgumentParser(
description='Exports findings from Security Hub in OCSF format. Exports all findings by default.'
' Filter findings with the program options.'
)
parser.add_argument(
'--account',
type=str,
help='Filter by AWS account ID'
)
parser.add_argument(
'--status',
type=str,
help='Filter by finding status (e.g., New, Suppressed, Resolved)'
)
parser.add_argument(
'--severity',
type=str,
action='append',
help='Filter by severity (can specify multiple times for OR logic)'
)
parser.add_argument(
'--days-ago',
type=int,
help='Filter findings created within the last N days, e.g. 30'
)
# Parse arguments
args = parser.parse_args()
# Build filters
filters = build_filters(args)
# Get findings
findings = get_ocsf_findings(filters)
# Output findings as JSON
print(json.dumps(findings, indent=2))
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment