Skip to content

Instantly share code, notes, and snippets.

@skuenzli
Created September 11, 2025 02:20
Show Gist options
  • Save skuenzli/201fcdbd7fd6212a10c4e57e2e9e2b3f to your computer and use it in GitHub Desktop.
Save skuenzli/201fcdbd7fd6212a10c4e57e2e9e2b3f to your computer and use it in GitHub Desktop.
Analyze Issues in Security Hub
@tool
def analyze_security_findings(findings: List[Dict[str, Any]],
max_important_findings: int = 10) -> Dict[str, Any]:
"""
Analyze and prioritize security findings based on severity, resource type, and impact.
Args:
findings: List of security findings in OCSF format
max_important_findings: Maximum number of 'important' findings to return from the analysis (default 10)
Returns:
A dictionary reporting the analysis with prioritized findings, statistics, and details of the most important findings
"""
if not findings:
return {
'status': 'no_findings',
'message': 'No security findings to analyze'
}
max_findings_to_process = 1000
total_findings = len(findings)
findings_to_analyze = findings[:max_findings_to_process] if total_findings > max_findings_to_process else findings
logger.info(f"Analyzing {len(findings_to_analyze)} of {total_findings} total findings")
# Calculate statistics
severity_counts = {}
resource_type_counts = {}
compliance_status_counts = {}
# Process findings more efficiently
important_findings = []
for finding in findings_to_analyze:
try:
# Count by severity and identify critical findings
severity_id = finding.get('severity_id', 0)
severity = finding.get('severity', 'UNKNOWN')
severity_counts[severity] = severity_counts.get(severity, 0) + 1
if severity_id >= 4:
important_findings.append(finding)
# Count by resource type (limit to first resource for efficiency)
resources = finding.get('resources', [])
if resources:
res_type = resources[0].get('type', 'Unknown')
resource_type_counts[res_type] = resource_type_counts.get(res_type, 0) + 1
# Count by compliance status
compliance_status = finding.get('compliance', {}).get('Status', 'UNKNOWN')
compliance_status_counts[compliance_status] = compliance_status_counts.get(compliance_status, 0) + 1
except Exception as e:
logger.error(f"Error processing finding: {e}")
logger.error(f"Finding: {json.dumps(finding, indent=2)}")
continue
# Sort findings by severity
important_findings.sort(key=lambda x: x.get('severity_id', 0), reverse=True)
top_important_findings = important_findings[:max_important_findings] # top N most important findings
analysis = {
'total_findings_count': total_findings,
'analyzed_findings_count': len(findings_to_analyze),
'important_findings_count': len(important_findings),
'important_findings': top_important_findings,
'severity_distribution': severity_counts,
'resource_type_distribution': dict(list(resource_type_counts.items())[:10]), # Top 10 resource types
'compliance_status_distribution': compliance_status_counts,
'recommendations': generate_recommendations(top_important_findings)
}
if total_findings > max_findings_to_process:
analysis['note'] = f"Analysis limited to first {max_findings_to_process} findings out of {total_findings} total"
logger.info(f"Security findings analysis completed for {len(findings_to_analyze)} findings")
return analysis
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment