Created
November 13, 2023 21:11
-
-
Save atheiman/8b8056ad756544ec475203aa8b6076ee to your computer and use it in GitHub Desktop.
Lambda Function to update Security Hub Findings attributes "UserDefinedFields" and "Note" to include AWS account and OrganizationalUnit metadata
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Resources: | |
SecurityHubFindingUpdateFunction: | |
Type: AWS::Lambda::Function | |
Properties: | |
Description: Applies metadata to Security Hub findings | |
Role: !Sub '${SecurityHubFindingUpdateFunctionRole.Arn}' | |
# ReservedConcurrentExecutions can be used to throttle the function if invocations get too | |
# high. However, all findings may not be updated. | |
#ReservedConcurrentExecutions: 3 | |
Environment: | |
Variables: | |
# This string is also set in event rule pattern | |
UPDATED_USER_DEFINED_FIELDS_KEY: '_AccountDataAdded' | |
Handler: index.handler | |
Timeout: 20 | |
Runtime: python3.11 | |
Tags: | |
- Key: CfnStackId | |
Value: !Ref AWS::StackId | |
Code: | |
ZipFile: | | |
import boto3 | |
import botocore | |
import json | |
import os | |
import time | |
from functools import lru_cache | |
from datetime import datetime, timezone | |
orgs = boto3.client("organizations", region_name=os.environ["AWS_REGION"]) | |
sechub = boto3.client("securityhub", region_name=os.environ["AWS_REGION"]) | |
updated_user_defined_fields_key = os.environ["UPDATED_USER_DEFINED_FIELDS_KEY"] | |
def get_orgs_resource_tags_dict(resource_id): | |
# Reform tags list of dictionaries as a single dictionary | |
tags_list = orgs.list_tags_for_resource(ResourceId=resource_id)["Tags"] | |
return {t["Key"]: t["Value"] for t in tags_list} | |
@lru_cache | |
def get_organizational_parent(child_id, ttl_hash=None): | |
del ttl_hash # ttl_hash is only used to expire cache entries in lru_cache | |
parent = orgs.list_parents(ChildId=child_id)["Parents"][0] | |
if parent["Type"] == "ROOT": | |
parent_data = orgs.list_roots()["Roots"][0] | |
else: | |
parent_data = orgs.describe_organizational_unit(OrganizationalUnitId=parent["Id"])["OrganizationalUnit"] | |
parent = {**parent, **parent_data} | |
for k in list(parent.keys()): | |
# remove unwanted attributes from api response(s) | |
if k not in ["Id", "Type", "Name"]: | |
parent.pop(k) | |
parent["Tags"] = get_orgs_resource_tags_dict(parent["Id"]) | |
return parent | |
@lru_cache | |
def get_account_data(account_id, ttl_hash=None): | |
del ttl_hash # ttl_hash is only used to expire cache entries in lru_cache | |
account = orgs.describe_account(AccountId=account_id)["Account"] | |
for k in list(account.keys()): | |
# remove unwanted attributes from api response(s) | |
if k not in ["Id", "Email", "Name"]: | |
account.pop(k) | |
account["Tags"] = get_orgs_resource_tags_dict(account["Id"]) | |
child_id = account["Id"] | |
parents = [] | |
while not parents or not parents[-1]["Type"] == "ROOT": | |
if parents: | |
child_id = parents[-1]["Id"] | |
parents.append(get_organizational_parent(child_id, ttl_hash=get_ttl_hash())) | |
parents.reverse() | |
account["OrganizationalUnit"] = parents[-1] | |
# Build OU path using OU names - /Root/path/to/OU | |
account["OrganizationalUnit"]["Path"] = "/" + "/".join([p["Name"] for p in parents]) | |
return account | |
# See https://stackoverflow.com/questions/31771286/python-in-memory-cache-with-time-to-live | |
def get_ttl_hash(seconds=3600): | |
"""Return the same value within `seconds` time period""" | |
return round(time.time() / seconds) | |
def dict_dot_notation(d, path=[]): | |
d2 = {} | |
for k, v in d.items(): | |
k_path = path + [str(k)] | |
k_formatted = ".".join(k_path) | |
if isinstance(v, dict): | |
if len(v.keys()) == 0: | |
# handle empty dict | |
d2[k_formatted] = str(v) | |
# merge in dict with recursive call | |
d2 = {**d2, **dict_dot_notation(v, path=k_path)} | |
elif isinstance(v, list) or isinstance(v, tuple): | |
# handle list / tuple as comma separated strings | |
d2[k_formatted] = ",".join([str(i) for i in v]) | |
else: | |
# force anything else to string representation | |
d2[k_formatted] = str(v) | |
return d2 | |
def handler(event, context): | |
# print(json.dumps(event)) | |
for finding in event["detail"]["findings"]: | |
finding_identifier = {"Id": finding["Id"], "ProductArn": finding["ProductArn"]} | |
print(json.dumps(finding_identifier, default=str)) | |
current_user_defined_fields = finding.get("UserDefinedFields", {}) | |
if updated_user_defined_fields_key in current_user_defined_fields: | |
print("Finding previously updated, skipping") | |
return | |
account = get_account_data(finding["AwsAccountId"], ttl_hash=get_ttl_hash()) | |
timestamp = datetime.now(timezone.utc).strftime("%y-%m-%d %H:%M:%S %Z") | |
new_user_defined_fields = dict_dot_notation({"Account": account, updated_user_defined_fields_key: timestamp}) | |
user_defined_fields = {**current_user_defined_fields, **new_user_defined_fields} | |
print("UserDefinedFields") | |
print(json.dumps(user_defined_fields, default=str)) | |
note = { | |
# data.Note.Text should NOT be longer than 512 characters. | |
"Text": json.dumps(account, default=str)[:512], | |
"UpdatedBy": context.invoked_function_arn, | |
} | |
print("Note") | |
print(json.dumps(note, default=str)) | |
sechub.batch_update_findings( | |
FindingIdentifiers=[finding_identifier], | |
Note=note, | |
UserDefinedFields=user_defined_fields, | |
) | |
# print("Combined cache info summary:") | |
# combined_cache_info = {} | |
# for f in [get_organizational_parent, get_account_data]: | |
# combined_cache_info[f.__name__] = {} | |
# for a in ["currsize", "hits", "misses"]: | |
# combined_cache_info[f.__name__][a] = getattr(f.cache_info(), a) | |
# print(json.dumps(combined_cache_info, default=str)) | |
SecurityHubFindingUpdateFunctionLogGroup: | |
Type: AWS::Logs::LogGroup | |
Properties: | |
LogGroupName: !Sub '/aws/lambda/${SecurityHubFindingUpdateFunction}' | |
RetentionInDays: 14 | |
Tags: | |
- Key: CfnStackId | |
Value: !Ref AWS::StackId | |
SecurityHubFindingUpdateFunctionRole: | |
Type: AWS::IAM::Role | |
Properties: | |
AssumeRolePolicyDocument: | |
Version: '2012-10-17' | |
Statement: | |
- Effect: Allow | |
Principal: | |
Service: lambda.amazonaws.com | |
Action: sts:AssumeRole | |
Tags: | |
- Key: CfnStackId | |
Value: !Ref AWS::StackId | |
ManagedPolicyArns: | |
- !Sub 'arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' | |
Policies: | |
- PolicyName: Inline | |
PolicyDocument: | |
Version: '2012-10-17' | |
Statement: | |
- Effect: Allow | |
Action: | |
- organizations:Describe* | |
- organizations:List* | |
- securityhub:BatchUpdateFindings | |
Resource: '*' | |
SecurityHubFindingsEventsRule: | |
Type: AWS::Events::Rule | |
Properties: | |
Description: !Sub >- | |
Invoke Lambda function ${SecurityHubFindingUpdateFunction.Arn} for Security | |
Hub findings. All new findings and all updates to existing findings. Created | |
by ${AWS::StackId} | |
State: ENABLED | |
EventPattern: | |
source: [aws.securityhub] | |
detail-type: [Security Hub Findings - Imported] | |
detail: | |
findings: | |
UserDefinedFields: | |
# This key string is also set in lambda function env vars | |
'_AccountDataAdded': [ { "exists": false } ] | |
Targets: | |
- Id: SecurityHubFindingUpdateFunction | |
Arn: !Sub '${SecurityHubFindingUpdateFunction.Arn}' | |
SecurityHubFindingUpdateFunctionPermissionSecurityHubFindingsEventsRule: | |
Type: AWS::Lambda::Permission | |
Properties: | |
FunctionName: !Ref SecurityHubFindingUpdateFunction | |
Action: lambda:InvokeFunction | |
Principal: events.amazonaws.com | |
SourceArn: !Sub '${SecurityHubFindingsEventsRule.Arn}' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment