Created
January 9, 2025 16:16
-
-
Save austinsonger/f3e507e3eead19f0694d9acfa55dd0f5 to your computer and use it in GitHub Desktop.
AWS Inspector Findings.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from datetime import datetime | |
import subprocess | |
import json | |
import logging | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# User-configurable service toggles | |
enable_lambda = True | |
enable_eks = True | |
enable_ec2 = True | |
enable_rds = True | |
enable_ecr_repos = True | |
enable_ecr_images = True | |
# User-configurable ECR repositories and images | |
repositories_to_scan = [ | |
"repository1", | |
"repository2" | |
] | |
images_to_scan = [ | |
{"repository": "repository1", "imageTag": "latest"}, | |
{"repository": "repository2", "imageTag": "v1.0.0"} | |
] | |
def run_aws_cli(command): | |
""" | |
Runs an AWS CLI command and returns the parsed JSON output. | |
Args: | |
command (str): The AWS CLI command to execute. | |
Returns: | |
dict: Parsed JSON output from the command. | |
""" | |
try: | |
logger.info(f"Executing command: {command}") | |
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=300) | |
try: | |
output = json.loads(result.stdout) | |
logger.info(f"Command output: {output}") | |
return output | |
except json.JSONDecodeError: | |
logger.error("Failed to parse JSON output.") | |
return None | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Error running command: {e.stderr.decode()}", exc_info=True) | |
return None | |
except subprocess.TimeoutExpired: | |
logger.error(f"Command timed out: {command}", exc_info=True) | |
return None | |
def extract_findings(findings, aws_service): | |
""" | |
Extracts and structures findings with the additional AWS Service field. | |
Args: | |
findings (list): List of findings from Inspector. | |
aws_service (str): Name of the AWS service the findings are associated with. | |
Returns: | |
list: Structured findings with the AWS Service field added. | |
""" | |
extracted_findings = [] | |
if not isinstance(findings, list): | |
logger.error("Findings is not a list. Skipping processing.") | |
findings = [] | |
for f in findings: | |
if not isinstance(f, dict): | |
logger.warning("Skipping invalid finding structure.") | |
continue | |
extracted_findings.append({ | |
"AWS Service": aws_service, | |
"findingArn": f.get("findingArn"), | |
"firstObservedAt": f.get("firstObservedAt"), | |
"lastObservedAt": f.get("lastObservedAt"), | |
"status": f.get("status"), | |
"type": f.get("type"), | |
"severity": f.get("severity"), | |
"title": f.get("title"), | |
"description": f.get("description"), | |
"epss": f.get("epss", {}).get("score"), | |
"fixAvailable": f.get("fixAvailable"), | |
"inspectorScoreDetails": f.get("inspectorScoreDetails"), | |
"codeVulnerabilityDetails": f.get("codeVulnerabilityDetails") if aws_service == "Lambda" else None, | |
"awsLambdaFunction": f.get("resources", [{}])[0].get("details", {}).get("awsLambdaFunction") if aws_service == "Lambda" else None, | |
"awsEc2Instance": f.get("resources", [{}])[0].get("details", {}).get("awsEc2Instance") if aws_service == "EC2" else None, | |
"awsEcrContainerImage": f.get("resources", [{}])[0].get("details", {}).get("awsEcrContainerImage") if aws_service == "EKS" or aws_service == "ECR" else None, | |
"cvss2": f.get("packageVulnerabilityDetails", {}).get("cvss", [{}])[0].get("cvss2"), | |
"cvss3": f.get("packageVulnerabilityDetails", {}).get("cvss", [{}])[0].get("cvss3"), | |
"atigData": f.get("atigData"), | |
"referenceUrls": f.get("packageVulnerabilityDetails", {}).get("referenceUrls"), | |
"source": f.get("packageVulnerabilityDetails", {}).get("source"), | |
"sourceUrl": f.get("packageVulnerabilityDetails", {}).get("sourceUrl"), | |
"vendorSeverity": f.get("packageVulnerabilityDetails", {}).get("vendorSeverity"), | |
"vendorCreatedAt": f.get("packageVulnerabilityDetails", {}).get("vendorCreatedAt"), | |
"vendorUpdatedAt": f.get("packageVulnerabilityDetails", {}).get("vendorUpdatedAt"), | |
"relatedVulnerabilities": f.get("packageVulnerabilityDetails", {}).get("relatedVulnerabilities"), | |
"vulnerablePackages": f.get("packageVulnerabilityDetails", {}).get("vulnerablePackages"), | |
"networkReachabilityDetails": f.get("networkReachabilityDetails"), | |
"remediation": f.get("remediation", {}).get("recommendation", {}).get("text"), | |
"remediationUrl": f.get("remediation", {}).get("recommendation", {}).get("Url"), | |
"resources": f.get("resources", []), | |
"createdAt": f.get("createdAt"), | |
"updatedAt": f.get("updatedAt"), | |
}) | |
return extracted_findings | |
def extract_cis_findings(findings): | |
""" | |
Extracts CIS-specific findings from the provided results. | |
Args: | |
findings (list): List of CIS findings. | |
Returns: | |
list: Structured CIS findings. | |
""" | |
extracted_cis_findings = [] | |
for finding in findings: | |
extracted_cis_findings.append({ | |
"accountId": finding.get("accountId"), | |
"checkDescription": finding.get("checkDescription"), | |
"checkId": finding.get("checkId"), | |
"findingArn": finding.get("findingArn"), | |
"level": finding.get("level"), | |
"platform": finding.get("platform"), | |
"remediation": finding.get("remediation"), | |
"scanArn": finding.get("scanArn"), | |
"status": finding.get("status"), | |
"statusReason": finding.get("statusReason"), | |
"targetResourceId": finding.get("targetResourceId"), | |
"title": finding.get("title"), | |
}) | |
return extracted_cis_findings | |
class Inspector: | |
def __init__(self, enable_lambda=True, enable_eks=True, enable_ec2=True, enable_rds=True, enable_ecr_repos=True, enable_ecr_images=True): | |
logger.info("Initializing Inspector") | |
self.enable_lambda = enable_lambda | |
self.enable_eks = enable_eks | |
self.enable_ec2 = enable_ec2 | |
self.enable_rds = enable_rds | |
self.enable_ecr_repos = enable_ecr_repos | |
self.enable_ecr_images = enable_ecr_images | |
def get_lambda_findings(self): | |
if not self.enable_lambda: | |
return [] | |
command = "aws lambda list-functions" | |
result = run_aws_cli(command) | |
functions = [func["FunctionArn"] for func in result.get("Functions", [])] | |
findings = [] | |
for function_arn in functions: | |
command = "aws inspector2 list-findings --filter-criteria 'resourceType=LambdaFunction'" | |
result = run_aws_cli(command) | |
findings.extend(extract_findings(result.get("findings", []), "Lambda")) | |
return findings | |
def get_eks_findings(self): | |
if not self.enable_eks: | |
return [] | |
command = "aws eks list-clusters" | |
result = run_aws_cli(command) | |
clusters = result.get("clusters", []) | |
findings = [] | |
for cluster_name in clusters: | |
cluster_arn_command = f"aws eks describe-cluster --name {cluster_name}" | |
cluster_details = run_aws_cli(cluster_arn_command) | |
cluster_arn = cluster_details["cluster"].get("arn") if cluster_details else None | |
if cluster_arn: | |
command = "aws inspector2 list-findings --filter-criteria 'resourceType=EksCluster'" | |
result = run_aws_cli(command) | |
findings.extend(extract_findings(result.get("findings", []), "EKS")) | |
return findings | |
def get_ec2_findings(self): | |
if not self.enable_ec2: | |
return [] | |
command = "aws ec2 describe-instances" | |
result = run_aws_cli(command) | |
instances = [] | |
for reservation in result.get("Reservations", []): | |
if "Instances" in reservation: | |
instances.extend([instance["InstanceId"] for instance in reservation["Instances"]]) | |
findings = [] | |
for instance_id in instances: | |
command = f"aws inspector2 list-findings --filter-criteria 'resourceType=Ec2Instance'" | |
result = run_aws_cli(command) | |
findings.extend(extract_findings(result.get("findings", []), "EC2")) | |
return findings | |
def get_rds_findings(self): | |
if not self.enable_rds: | |
return [] | |
command = "aws rds describe-db-instances" | |
result = run_aws_cli(command) | |
instances = [db["DBInstanceArn"] for db in result.get("DBInstances", [])] | |
findings = [] | |
for db_instance_arn in instances: | |
command = f"aws inspector2 list-findings --filter-criteria 'resourceType=RdsInstance'" | |
result = run_aws_cli(command) | |
findings.extend(extract_findings(result.get("findings", []), "RDS")) | |
return findings | |
def get_ecr_repo_findings(self): | |
if not self.enable_ecr_repos: | |
return [] | |
findings = [] | |
for repo in repositories_to_scan: | |
command = f"aws inspector2 list-findings --filter-criteria 'repositoryName={repo}'" | |
result = run_aws_cli(command) | |
findings.extend(extract_findings(result.get("findings", []), "ECR")) | |
return findings | |
def get_ecr_image_findings(self): | |
if not self.enable_ecr_images: | |
return [] | |
findings = [] | |
for image in images_to_scan: | |
command = f"aws inspector2 list-findings --filter-criteria 'repositoryName={image['repository']},imageTag={image['imageTag']}'" | |
result = run_aws_cli(command) | |
findings.extend(extract_findings(result.get("findings", []), "ECR")) | |
return findings | |
def get_cis_findings(self): | |
command = "aws inspector2 list-findings --filter-criteria 'complianceType=CIS'" | |
result = run_aws_cli(command) | |
if result and "findings" in result: | |
return extract_cis_findings(result.get("findings", [])) | |
return [] | |
if __name__ == "__main__": | |
inspector = Inspector( | |
enable_lambda=enable_lambda, | |
enable_eks=enable_eks, | |
enable_ec2=enable_ec2, | |
enable_rds=enable_rds, | |
enable_ecr_repos=enable_ecr_repos, | |
enable_ecr_images=enable_ecr_images | |
) | |
all_findings = [] | |
all_findings.extend(inspector.get_lambda_findings()) | |
all_findings.extend(inspector.get_eks_findings()) | |
all_findings.extend(inspector.get_ec2_findings()) | |
all_findings.extend(inspector.get_rds_findings()) | |
all_findings.extend(inspector.get_ecr_repo_findings()) | |
all_findings.extend(inspector.get_ecr_image_findings()) | |
# Save general findings to a structured path | |
current_date = datetime.now() | |
output_path = f"findings/output/{current_date.year}/{current_date.month:02}/{current_date.year}-{current_date.month:02}-{current_date.day:02}_{current_date.hour:02}{current_date.minute:02}{current_date.second:02}-findings.json" | |
os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
os.chmod("findings/output", 0o750) | |
with open(output_path, "w") as output_file: | |
json.dump(all_findings, output_file, indent=2) | |
logger.info(f"Findings saved to {output_path}") | |
# Save CIS findings to a separate path | |
cis_findings = inspector.get_cis_findings() | |
cis_output_path = f"findings/cis-output/{current_date.year}/{current_date.month:02}/{current_date.year}-{current_date.month:02}-{current_date.day:02}_{current_date.hour:02}{current_date.minute:02}{current_date.second:02}-cis-findings.json" | |
os.makedirs(os.path.dirname(cis_output_path), exist_ok=True) | |
os.chmod("findings/cis-output", 0o750) | |
with open(cis_output_path, "w") as cis_output_file: | |
json.dump(cis_findings, cis_output_file, indent=2) | |
logger.info(f"CIS findings saved to {cis_output_path}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment