Skip to content

Instantly share code, notes, and snippets.

@austinsonger
Created January 9, 2025 16:16
Show Gist options
  • Save austinsonger/f3e507e3eead19f0694d9acfa55dd0f5 to your computer and use it in GitHub Desktop.
Save austinsonger/f3e507e3eead19f0694d9acfa55dd0f5 to your computer and use it in GitHub Desktop.
AWS Inspector Findings.
import os
from datetime import datetime
import subprocess
import json
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# User-configurable service toggles
enable_lambda = True
enable_eks = True
enable_ec2 = True
enable_rds = True
enable_ecr_repos = True
enable_ecr_images = True
# User-configurable ECR repositories and images
repositories_to_scan = [
"repository1",
"repository2"
]
images_to_scan = [
{"repository": "repository1", "imageTag": "latest"},
{"repository": "repository2", "imageTag": "v1.0.0"}
]
def run_aws_cli(command):
"""
Runs an AWS CLI command and returns the parsed JSON output.
Args:
command (str): The AWS CLI command to execute.
Returns:
dict: Parsed JSON output from the command.
"""
try:
logger.info(f"Executing command: {command}")
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=300)
try:
output = json.loads(result.stdout)
logger.info(f"Command output: {output}")
return output
except json.JSONDecodeError:
logger.error("Failed to parse JSON output.")
return None
except subprocess.CalledProcessError as e:
logger.error(f"Error running command: {e.stderr.decode()}", exc_info=True)
return None
except subprocess.TimeoutExpired:
logger.error(f"Command timed out: {command}", exc_info=True)
return None
def extract_findings(findings, aws_service):
"""
Extracts and structures findings with the additional AWS Service field.
Args:
findings (list): List of findings from Inspector.
aws_service (str): Name of the AWS service the findings are associated with.
Returns:
list: Structured findings with the AWS Service field added.
"""
extracted_findings = []
if not isinstance(findings, list):
logger.error("Findings is not a list. Skipping processing.")
findings = []
for f in findings:
if not isinstance(f, dict):
logger.warning("Skipping invalid finding structure.")
continue
extracted_findings.append({
"AWS Service": aws_service,
"findingArn": f.get("findingArn"),
"firstObservedAt": f.get("firstObservedAt"),
"lastObservedAt": f.get("lastObservedAt"),
"status": f.get("status"),
"type": f.get("type"),
"severity": f.get("severity"),
"title": f.get("title"),
"description": f.get("description"),
"epss": f.get("epss", {}).get("score"),
"fixAvailable": f.get("fixAvailable"),
"inspectorScoreDetails": f.get("inspectorScoreDetails"),
"codeVulnerabilityDetails": f.get("codeVulnerabilityDetails") if aws_service == "Lambda" else None,
"awsLambdaFunction": f.get("resources", [{}])[0].get("details", {}).get("awsLambdaFunction") if aws_service == "Lambda" else None,
"awsEc2Instance": f.get("resources", [{}])[0].get("details", {}).get("awsEc2Instance") if aws_service == "EC2" else None,
"awsEcrContainerImage": f.get("resources", [{}])[0].get("details", {}).get("awsEcrContainerImage") if aws_service == "EKS" or aws_service == "ECR" else None,
"cvss2": f.get("packageVulnerabilityDetails", {}).get("cvss", [{}])[0].get("cvss2"),
"cvss3": f.get("packageVulnerabilityDetails", {}).get("cvss", [{}])[0].get("cvss3"),
"atigData": f.get("atigData"),
"referenceUrls": f.get("packageVulnerabilityDetails", {}).get("referenceUrls"),
"source": f.get("packageVulnerabilityDetails", {}).get("source"),
"sourceUrl": f.get("packageVulnerabilityDetails", {}).get("sourceUrl"),
"vendorSeverity": f.get("packageVulnerabilityDetails", {}).get("vendorSeverity"),
"vendorCreatedAt": f.get("packageVulnerabilityDetails", {}).get("vendorCreatedAt"),
"vendorUpdatedAt": f.get("packageVulnerabilityDetails", {}).get("vendorUpdatedAt"),
"relatedVulnerabilities": f.get("packageVulnerabilityDetails", {}).get("relatedVulnerabilities"),
"vulnerablePackages": f.get("packageVulnerabilityDetails", {}).get("vulnerablePackages"),
"networkReachabilityDetails": f.get("networkReachabilityDetails"),
"remediation": f.get("remediation", {}).get("recommendation", {}).get("text"),
"remediationUrl": f.get("remediation", {}).get("recommendation", {}).get("Url"),
"resources": f.get("resources", []),
"createdAt": f.get("createdAt"),
"updatedAt": f.get("updatedAt"),
})
return extracted_findings
def extract_cis_findings(findings):
"""
Extracts CIS-specific findings from the provided results.
Args:
findings (list): List of CIS findings.
Returns:
list: Structured CIS findings.
"""
extracted_cis_findings = []
for finding in findings:
extracted_cis_findings.append({
"accountId": finding.get("accountId"),
"checkDescription": finding.get("checkDescription"),
"checkId": finding.get("checkId"),
"findingArn": finding.get("findingArn"),
"level": finding.get("level"),
"platform": finding.get("platform"),
"remediation": finding.get("remediation"),
"scanArn": finding.get("scanArn"),
"status": finding.get("status"),
"statusReason": finding.get("statusReason"),
"targetResourceId": finding.get("targetResourceId"),
"title": finding.get("title"),
})
return extracted_cis_findings
class Inspector:
def __init__(self, enable_lambda=True, enable_eks=True, enable_ec2=True, enable_rds=True, enable_ecr_repos=True, enable_ecr_images=True):
logger.info("Initializing Inspector")
self.enable_lambda = enable_lambda
self.enable_eks = enable_eks
self.enable_ec2 = enable_ec2
self.enable_rds = enable_rds
self.enable_ecr_repos = enable_ecr_repos
self.enable_ecr_images = enable_ecr_images
def get_lambda_findings(self):
if not self.enable_lambda:
return []
command = "aws lambda list-functions"
result = run_aws_cli(command)
functions = [func["FunctionArn"] for func in result.get("Functions", [])]
findings = []
for function_arn in functions:
command = "aws inspector2 list-findings --filter-criteria 'resourceType=LambdaFunction'"
result = run_aws_cli(command)
findings.extend(extract_findings(result.get("findings", []), "Lambda"))
return findings
def get_eks_findings(self):
if not self.enable_eks:
return []
command = "aws eks list-clusters"
result = run_aws_cli(command)
clusters = result.get("clusters", [])
findings = []
for cluster_name in clusters:
cluster_arn_command = f"aws eks describe-cluster --name {cluster_name}"
cluster_details = run_aws_cli(cluster_arn_command)
cluster_arn = cluster_details["cluster"].get("arn") if cluster_details else None
if cluster_arn:
command = "aws inspector2 list-findings --filter-criteria 'resourceType=EksCluster'"
result = run_aws_cli(command)
findings.extend(extract_findings(result.get("findings", []), "EKS"))
return findings
def get_ec2_findings(self):
if not self.enable_ec2:
return []
command = "aws ec2 describe-instances"
result = run_aws_cli(command)
instances = []
for reservation in result.get("Reservations", []):
if "Instances" in reservation:
instances.extend([instance["InstanceId"] for instance in reservation["Instances"]])
findings = []
for instance_id in instances:
command = f"aws inspector2 list-findings --filter-criteria 'resourceType=Ec2Instance'"
result = run_aws_cli(command)
findings.extend(extract_findings(result.get("findings", []), "EC2"))
return findings
def get_rds_findings(self):
if not self.enable_rds:
return []
command = "aws rds describe-db-instances"
result = run_aws_cli(command)
instances = [db["DBInstanceArn"] for db in result.get("DBInstances", [])]
findings = []
for db_instance_arn in instances:
command = f"aws inspector2 list-findings --filter-criteria 'resourceType=RdsInstance'"
result = run_aws_cli(command)
findings.extend(extract_findings(result.get("findings", []), "RDS"))
return findings
def get_ecr_repo_findings(self):
if not self.enable_ecr_repos:
return []
findings = []
for repo in repositories_to_scan:
command = f"aws inspector2 list-findings --filter-criteria 'repositoryName={repo}'"
result = run_aws_cli(command)
findings.extend(extract_findings(result.get("findings", []), "ECR"))
return findings
def get_ecr_image_findings(self):
if not self.enable_ecr_images:
return []
findings = []
for image in images_to_scan:
command = f"aws inspector2 list-findings --filter-criteria 'repositoryName={image['repository']},imageTag={image['imageTag']}'"
result = run_aws_cli(command)
findings.extend(extract_findings(result.get("findings", []), "ECR"))
return findings
def get_cis_findings(self):
command = "aws inspector2 list-findings --filter-criteria 'complianceType=CIS'"
result = run_aws_cli(command)
if result and "findings" in result:
return extract_cis_findings(result.get("findings", []))
return []
if __name__ == "__main__":
inspector = Inspector(
enable_lambda=enable_lambda,
enable_eks=enable_eks,
enable_ec2=enable_ec2,
enable_rds=enable_rds,
enable_ecr_repos=enable_ecr_repos,
enable_ecr_images=enable_ecr_images
)
all_findings = []
all_findings.extend(inspector.get_lambda_findings())
all_findings.extend(inspector.get_eks_findings())
all_findings.extend(inspector.get_ec2_findings())
all_findings.extend(inspector.get_rds_findings())
all_findings.extend(inspector.get_ecr_repo_findings())
all_findings.extend(inspector.get_ecr_image_findings())
# Save general findings to a structured path
current_date = datetime.now()
output_path = f"findings/output/{current_date.year}/{current_date.month:02}/{current_date.year}-{current_date.month:02}-{current_date.day:02}_{current_date.hour:02}{current_date.minute:02}{current_date.second:02}-findings.json"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
os.chmod("findings/output", 0o750)
with open(output_path, "w") as output_file:
json.dump(all_findings, output_file, indent=2)
logger.info(f"Findings saved to {output_path}")
# Save CIS findings to a separate path
cis_findings = inspector.get_cis_findings()
cis_output_path = f"findings/cis-output/{current_date.year}/{current_date.month:02}/{current_date.year}-{current_date.month:02}-{current_date.day:02}_{current_date.hour:02}{current_date.minute:02}{current_date.second:02}-cis-findings.json"
os.makedirs(os.path.dirname(cis_output_path), exist_ok=True)
os.chmod("findings/cis-output", 0o750)
with open(cis_output_path, "w") as cis_output_file:
json.dump(cis_findings, cis_output_file, indent=2)
logger.info(f"CIS findings saved to {cis_output_path}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment