Created
June 6, 2019 10:36
-
-
Save ixjosemi/d58d7f178b6e17a84fcd447b3d57da31 to your computer and use it in GitHub Desktop.
custom-sns-integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Copyright (C) 2015-2019, Wazuh Inc. | |
# Created by Wazuh, Inc. <[email protected]>. | |
# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 | |
import boto3 | |
import sys | |
import socket | |
import json | |
import logging | |
if sys.version_info[0] == 2: | |
import ConfigParser as configparser | |
else: | |
import configparser | |
def send_alert_to_sns(region, arn, message): | |
# Connect to sns | |
sns = boto3.client('sns', region_name=region) | |
# Send the message to the topic | |
sns.publish( | |
TopicArn=arn, | |
Message=message | |
) | |
def get_installation_path(): | |
configuration = {} | |
with open('/etc/ossec-init.conf', 'r') as f: | |
for l in f: | |
key, val = l.strip().split("=") | |
if key == 'DIRECTORY': | |
configuration[key]= val | |
installation_path = configuration['DIRECTORY'].replace('"', '') | |
return installation_path | |
def main(): | |
# Parse args | |
try: | |
alerts_file = sys.argv[1] | |
except Exception as e: | |
logging.error("Wrong arguments: '{}".format(e)) | |
exit(1) | |
# Read alerts file | |
try: | |
with open(alerts_file, 'r') as alert_f: | |
alert_json = json.loads(alert_f.read()) | |
except Exception as e: | |
logging.error("Cannot read alerts file: '{}'".format(e)) | |
exit(1) | |
# Read sns.config file | |
try: | |
config = configparser.ConfigParser() | |
config.read("/var/ossec/etc/sns.conf") | |
arn = config.get('SNSCONFIG', 'topic_arn') | |
region = config.get('SNSCONFIG', 'region') | |
except Exception as e: | |
logging.error("Cannot read the sns config file: '{}'".format(e)) | |
exit(1) | |
# Read installation path | |
try: | |
installation_path = get_installation_path() | |
logging_filepath = installation_path + '/logs/sns.log' | |
except Exception as e: | |
logging_filepath = '/var/ossec/logs/sns.log' | |
logging.error("Cannot read the ossec-init config file: '{}'".format(e)) | |
exit(1) | |
# Logging configuration | |
try: | |
hostname = socket.gethostname() | |
except Exception as e: | |
logging.error("Cannot solve hostname: '{}'".format(e)) | |
exit(1) | |
log_format = '%(asctime)s {} %(name)s %(levelname)s: %(message)s'.format(hostname) | |
logging.basicConfig(filename=logging_filepath, format=log_format, level=logging.INFO) | |
# Extract issue fields | |
alert_level = alert_json['rule']['level'] | |
description = alert_json['rule']['description'] | |
rule_id = alert_json['rule']['id'] | |
agent_id = alert_json['agent']['id'] | |
# Simplify parameters | |
# Description: Shorted to 100 chars due to the max size of an SMS is 160 chars. | |
# ARN: Removed sensitive information. | |
description = (description[:100] + '...') if len(description) > 100 else description | |
arn_name = arn.rsplit(':', 1)[-1] | |
# Message body creation | |
message = """ | |
Agent ID: {} | |
Level: {} | |
Description: {} | |
""".format(agent_id, alert_level, description) | |
# Publish message to topic | |
try: | |
logging.info("Sending alert ({}) to SNS topic: '{}'.".format(rule_id, arn_name)) | |
send_alert_to_sns(region, arn, message) | |
except Exception as e: | |
logging.error("Cannot send message to the topic: '{}'".format(e)) | |
exit(1) | |
if __name__ == "__main__": | |
try: | |
main() | |
except Exception as e: | |
logging.error("Cannot execute main function: '{}".format(e)) | |
exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment