Last active
October 21, 2018 10:53
-
-
Save s-fujimoto/02a690cf6302f1299efe to your computer and use it in GitHub Desktop.
collect vulnerability for AWS Lambda
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
##################################################################################### | |
### Select from low, middle, high. If you choice middle, script collect middle and high. | |
SEVERITY = "middle" | |
### Select from daily, weekly, monthly | |
INTERVAL = "daily" | |
### Specify Publish Topic Arn for SNS | |
TOPIC_ARN = "arn:aws:sns:*******:************:**********" | |
### Specify notify email subject | |
NOTIFY_SUBJECT = "[ALERT] Found Vulnerability Information" | |
### Select from product name from result to call getProductList API (http://jvndb.jvn.jp/myjvn?method=getProductList) | |
PRODUCTS = [ | |
"Linux Kernel", | |
"OpenSSL", | |
"Apache HTTP Server", | |
"Apache Tomcat" | |
] | |
##################################################################################### | |
import datetime | |
from dateutil import relativedelta | |
import boto3 | |
import urllib2 | |
from xml.dom import minidom | |
URL_PATH = "http://jvndb.jvn.jp/myjvn" | |
PRODUCT_LIST_METHOD = "getProductList" | |
VULN_OVERVIEW_LIST_METHOD = "getVulnOverviewList" | |
NS_STATUS = "http://jvndb.jvn.jp/myjvn/Status" | |
NS_SEC = "http://jvn.jp/rss/mod_sec/" | |
NS = "http://purl.org/rss/1.0/" | |
NS_PRODUCT = "http://jvndb.jvn.jp/myjvn/Results" | |
def lambda_handler(event, context): | |
product_ids = get_product_ids(PRODUCTS) | |
print product_ids | |
vulnerabilities = [] | |
for product in product_ids: | |
vuls = get_vulnerability_list(product, SEVERITY, INTERVAL) | |
if vuls: | |
vulnerabilities.extend(vuls) | |
if not vulnerabilities: | |
print "don't exist vulnerability." | |
return | |
notify(vulnerabilities, TOPIC_ARN) | |
def get_product_ids(product_names): | |
product_ids = set() | |
for name in product_names: | |
url = URL_PATH + "?method=" + PRODUCT_LIST_METHOD + "&keyword=" + name | |
res = urllib2.urlopen(url).read().encode("utf-8") | |
dom = minidom.parseString(res) | |
for product in dom.getElementsByTagNameNS(NS_PRODUCT, "Product"): | |
if product.getAttribute("pname") == name: | |
product_ids.add(product.getAttribute("pid")) | |
return product_ids | |
def get_vulnerability_list(product, severity, interval): | |
vulnerabilities = [] | |
if severity == "low": | |
severity = "l" | |
elif severity == "middle": | |
severity = "m" | |
elif severity == "high": | |
severity = "h" | |
start = None | |
now = datetime.datetime.now() | |
end = now - datetime.timedelta(days=1) | |
if interval == "daily": | |
start = end | |
elif interval == "weekly": | |
start = now - datetime.timedelta(days=7) | |
elif interval == "monthly": | |
start = now - relativedelta.relativedelta(months=1) | |
url = URL_PATH + "?method=" + VULN_OVERVIEW_LIST_METHOD \ | |
+ "&productId=" + product \ | |
+ "&severity=" + severity \ | |
+ "&dateFirstPublishedStartY=" + str(start.year) \ | |
+ "&dateFirstPublishedStartM=" + str(start.month) \ | |
+ "&dateFirstPublishedStartD=" + str(start.day) \ | |
+ "&dateFirstPublishedEndY=" + str(end.year) \ | |
+ "&dateFirstPublishedEndM=" + str(end.month) \ | |
+ "&dateFirstPublishedEndD=" + str(end.day) \ | |
+ "&rangeDatePublished=n&rangeDatePublic=n" | |
res = urllib2.urlopen(url).read().encode("utf-8") | |
dom = minidom.parseString(res) | |
total = dom.getElementsByTagNameNS(NS_STATUS, "Status")[0].getAttribute("totalRes") | |
for num in range(1, int(total), 50): | |
url = url + "&startItem=" + str(num) | |
res = urllib2.urlopen(url) | |
dom = minidom.parseString(res.read().encode("utf-8")) | |
for item in dom.getElementsByTagNameNS(NS, "item"): | |
vul = {} | |
if len(item.getElementsByTagNameNS(NS_SEC, "identifier")) == 1: | |
vul["id"] = item.getElementsByTagNameNS(NS_SEC, "identifier")[0].childNodes[0].nodeValue | |
if len(item.getElementsByTagNameNS(NS, "link")) == 1: | |
vul["link"] = item.getElementsByTagNameNS(NS, "link")[0].childNodes[0].nodeValue | |
if len(item.getElementsByTagNameNS(NS, "title")) == 1: | |
vul["title"] = item.getElementsByTagNameNS(NS, "title")[0].childNodes[0].nodeValue | |
if len(item.getElementsByTagNameNS(NS, "description")) == 1: | |
vul["description"] = item.getElementsByTagNameNS(NS, "description")[0].childNodes[0].nodeValue | |
if len(item.getElementsByTagNameNS(NS_SEC, "cvss")) == 1: | |
cvss = item.getElementsByTagNameNS(NS_SEC, "cvss")[0] | |
vul["cvss_score"] = cvss.getAttribute("score") | |
vul["cvss_severity"] = cvss.getAttribute("severity") | |
vulnerabilities.append(vul) | |
return vulnerabilities | |
def notify(vulnerabilities, topic_arn): | |
message = "" | |
for vul in vulnerabilities: | |
if vul.get("title"): | |
message += "Title: " + vul["title"] + "\n" | |
if vul.get("id"): | |
message += "ID: " + vul["id"] + "\n" | |
if vul.get("cvss_score"): | |
message += "CVSS Score: " + vul["cvss_score"] + ", Severity: " + vul["cvss_severity"] + "\n" | |
if vul.get("description"): | |
message += "Detail: " + vul["description"] + "\n" | |
if vul.get("link"): | |
message += "URL: " + vul["link"] + "\n" | |
message += "\n" | |
client = boto3.client("sns") | |
print( | |
client.publish(TopicArn=topic_arn, | |
Subject=NOTIFY_SUBJECT, | |
Message=message | |
)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment