Last active
March 15, 2021 15:47
-
-
Save ritesh/c8f01002edfdc54fd8be67c21b78cd41 to your computer and use it in GitHub Desktop.
Sechub to sentinel integration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Please diff with https://raw.githubusercontent.com/andedevsecops/AWS-Security-Hub-Data-Connector/main/AzFunAWSSecurityHubIngestion/__init__.py | |
# To see what's changed | |
# import requests | |
import datetime | |
import dateutil | |
import logging | |
import boto3 | |
# import gzip | |
import io | |
import csv | |
import time | |
import os | |
import sys | |
import json | |
import hashlib | |
import hmac | |
import base64 | |
from threading import Thread | |
from io import StringIO | |
# I'm using temporary credentials so don't need to set AWS keys | |
# also - don't have an Azure account | |
# | |
# sentinel_customer_id = os.environ.get('WorkspaceID') | |
# sentinel_shared_key = os.environ.get('WorkspaceKey') | |
# aws_access_key_id = os.environ.get('AWSAccessKeyId') | |
# aws_secret_acces_key = os.environ.get('AWSSecretAccessKey') | |
# aws_region_name = os.environ.get('AWSRegionName') | |
aws_securityhub_filters = os.environ.get('SecurityHubFilters') | |
# sentinel_log_type = os.environ.get('LogAnalyticsCustomLogName') | |
fresh_event_timestamp = os.environ.get('FreshEventTimeStamp') | |
all_new_findings = {} | |
all_new_findings['Findings'] = [] | |
def dummy_send(finding): | |
""" A silly function that adds findings to a Finding dict instead of sending them anywhere""" | |
""" We print it elsewhere """ | |
all_new_findings['Findings'].append(finding) | |
# def main(mytimer: func.TimerRequest) -> None: | |
def main(): | |
# Triggering manually, instead of using a timer | |
# if mytimer.past_due: | |
# logging.info('The timer is past due!') | |
logging.info('Starting program') | |
# sentinel = AzureSentinelConnector(sentinel_customer_id, sentinel_shared_key, sentinel_log_type, queue_size=10000, bulks_number=10) | |
# securityHubSession = SecurityHubClient(aws_access_key_id, aws_secret_acces_key, aws_region_name) | |
securityHubSession = SecurityHubClient() | |
securityhub_filters_dict = {} | |
if aws_securityhub_filters: | |
# TODO: really shouldn't be evaling env variables, could have code injection! | |
securityhub_filters = aws_securityhub_filters.replace("\'", "\"") | |
securityhub_filters_dict = eval(securityhub_filters) | |
results = securityHubSession.getFindings(securityhub_filters_dict) | |
if fresh_event_timestamp: | |
#TODO: not sure if this is the best way to get the timesteamp - parsing it would be better than coercing to int? | |
fresh_events_after_this_time = securityHubSession.freshEventTimestampGenerator(int(fresh_event_timestamp)) | |
else: | |
fresh_events_after_this_time = securityHubSession.freshEventTimestampGenerator(2000) | |
fresh_events = True | |
first_call = True | |
failed_sent_events_number = 0 | |
successfull_sent_events_number = 0 | |
while ((first_call or 'NextToken' in results) and fresh_events): | |
# Loop through all findings (100 per page) returned by Security Hub API call | |
# Break out of the loop when we have looked back across the last hour of events (based on the finding's LastObservedAt timestamp) | |
first_call = False | |
for finding in results['Findings']: | |
finding_timestamp = securityHubSession.findingTimestampGenerator(finding['LastObservedAt']) | |
if (finding_timestamp > fresh_events_after_this_time): | |
# sentinel.send(finding) | |
dummy_send(finding) | |
# Our dummy send function, always succeeds, 100% of the time, all of the time! | |
# failed_sent_events_number = sentinel.failed_sent_events_number | |
# successfull_sent_events_number = sentinel.successfull_sent_events_number | |
else: | |
fresh_events = False | |
break | |
if (fresh_events and 'NextToken' in results): | |
results = securityHubSession.getFindingsWithToken(results['NextToken'], securityhub_filters_dict) | |
if failed_sent_events_number: | |
logging.error('{} events have not been sent'.format(failed_sent_events_number)) | |
if successfull_sent_events_number: | |
logging.info('Program finished. {} events have been sent. {} events have not been sent'.format(successfull_sent_events_number, failed_sent_events_number)) | |
if successfull_sent_events_number == 0 and failed_sent_events_number == 0: | |
logging.info('No Fresh SecurityHub Events') | |
class SecurityHubClient: | |
# def __init__(self, aws_access_key_id, aws_secret_acces_key, aws_region_name): | |
def __init__(self): | |
# self.aws_access_key_id = aws_access_key_id | |
# self.aws_secret_acces_key = aws_secret_acces_key | |
# self.aws_region_name = aws_region_name | |
self.securityhub = boto3.client( | |
'securityhub', | |
# aws_access_key_id=self.aws_access_key_id, | |
# aws_secret_access_key=self.aws_secret_acces_key, | |
# region_name=self.aws_region_name | |
) | |
def freshEventTimestampGenerator(self, freshEventsDuration): | |
tm = datetime.datetime.utcfromtimestamp(time.time()) | |
return time.mktime((tm - datetime.timedelta(minutes=freshEventsDuration)).timetuple()) | |
# Gets the epoch time of a UTC timestamp in a Security Hub finding | |
def findingTimestampGenerator(self, finding_time): | |
d = dateutil.parser.parse(finding_time) | |
d.astimezone(dateutil.tz.tzutc()) | |
return time.mktime(d.timetuple()) | |
# Gets 100 most recent findings from securityhub | |
def getFindings(self, filters={}): | |
return self.securityhub.get_findings( | |
Filters=filters, | |
MaxResults=100, | |
SortCriteria=[{"Field": "LastObservedAt", "SortOrder": "desc"}]) | |
# Gets 100 findings from securityhub using the NextToken from a previous request | |
def getFindingsWithToken(self, token, filters={}): | |
return self.securityhub.get_findings( | |
Filters=filters, | |
NextToken=token, | |
MaxResults=100, | |
SortCriteria=[{"Field": "LastObservedAt", "SortOrder": "desc"}] | |
) | |
# class AzureSentinelConnector: | |
# def __init__(self, customer_id, shared_key, log_type, queue_size=200, bulks_number=10, queue_size_bytes=25 * (2**20)): | |
# self.customer_id = customer_id | |
# self.shared_key = shared_key | |
# self.log_type = log_type | |
# self.queue_size = queue_size | |
# self.bulks_number = bulks_number | |
# self.queue_size_bytes = queue_size_bytes | |
# self._queue = [] | |
# self._bulks_list = [] | |
# self.successfull_sent_events_number = 0 | |
# self.failed_sent_events_number = 0 | |
# self.failedToSend = False | |
# def send(self, event): | |
# self._queue.append(event) | |
# if len(self._queue) >= self.queue_size: | |
# self.flush(force=False) | |
# def flush(self, force=True): | |
# self._bulks_list.append(self._queue) | |
# if force: | |
# self._flush_bulks() | |
# else: | |
# if len(self._bulks_list) >= self.bulks_number: | |
# self._flush_bulks() | |
# self._queue = [] | |
# def _flush_bulks(self): | |
# jobs = [] | |
# for queue in self._bulks_list: | |
# if queue: | |
# queue_list = self._split_big_request(queue) | |
# for q in queue_list: | |
# jobs.append(Thread(target=self._post_data, args=(self.customer_id, self.shared_key, q, self.log_type, ))) | |
# for job in jobs: | |
# job.start() | |
# for job in jobs: | |
# job.join() | |
# self._bulks_list = [] | |
# def __enter__(self): | |
# pass | |
# def __exit__(self, type, value, traceback): | |
# self.flush() | |
# def _build_signature(self, customer_id, shared_key, date, content_length, method, content_type, resource): | |
# x_headers = 'x-ms-date:' + date | |
# string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource | |
# bytes_to_hash = bytes(string_to_hash, encoding="utf-8") | |
# decoded_key = base64.b64decode(shared_key) | |
# encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode() | |
# authorization = "SharedKey {}:{}".format(customer_id, encoded_hash) | |
# return authorization | |
# def _post_data(self, customer_id, shared_key, body, log_type): | |
# events_number = len(body) | |
# body = json.dumps(body) | |
# method = 'POST' | |
# content_type = 'application/json' | |
# resource = '/api/logs' | |
# rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') | |
# content_length = len(body) | |
# signature = self._build_signature(customer_id, shared_key, rfc1123date, content_length, method, content_type, resource) | |
# uri = 'https://' + customer_id + '.ods.opinsights.azure.com' + resource + '?api-version=2016-04-01' | |
# headers = { | |
# 'content-type': content_type, | |
# 'Authorization': signature, | |
# 'Log-Type': log_type, | |
# 'x-ms-date': rfc1123date | |
# } | |
# response = requests.post(uri, data=body, headers=headers) | |
# if (response.status_code >= 200 and response.status_code <= 299): | |
# logging.info('{} events have been successfully sent to Azure Sentinel'.format(events_number)) | |
# self.successfull_sent_events_number += events_number | |
# self.failedToSend = False | |
# else: | |
# logging.error("Error during sending events to Azure Sentinel. Response code: {}".format(response.status_code)) | |
# self.failed_sent_events_number += events_number | |
# self.failedToSend = True | |
# def _check_size(self, queue): | |
# data_bytes_len = len(json.dumps(queue).encode()) | |
# return data_bytes_len < self.queue_size_bytes | |
# def _split_big_request(self, queue): | |
# if self._check_size(queue): | |
# return [queue] | |
# else: | |
# middle = int(len(queue) / 2) | |
# queues_list = [queue[:middle], queue[middle:]] | |
# return self._split_big_request(queues_list[0]) + self._split_big_request(queues_list[1]) | |
if __name__== '__main__': | |
main() | |
print (json.dumps(all_new_findings)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment