Created
December 22, 2017 00:06
-
-
Save ezeeetm/e1b91a6c87f03f18f6c44fa5b4c4fc4c to your computer and use it in GitHub Desktop.
Automating Vulnerability Management on AWS with TripWire IP360
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import xmlrpclib | |
import ssl | |
import logging | |
import os | |
import sys | |
import time | |
logging.basicConfig(level=logging.DEBUG) | |
ip360_endpoint = 'https://your.tripwire.api_endpoint.ip_address/api2xmlrpc/' | |
ip360_username = 'username' | |
ip360_pw = 'password' | |
environments = { | |
'NONPROD': {'device_profiler':'DP.2', 'scan_profile':'ScanProfile.7', 'network':'Network.298'}, | |
'PROD': {'device_profiler':'DP.2', 'scan_profile':'ScanProfile.7', 'network':'Network.298'} | |
} | |
def get_conn(ip360_endpoint, ip360_username, ip360_pw): | |
# CWT ip_360 endpoint uses a self-signed SSL cert, which will throw an error in xmlrpclib. | |
# using 'context=ssl._create_unverified_context()' below to address this | |
# https://stackoverflow.com/questions/30461969/disable-default-certificate-verification-in-python-2-7-9 | |
server = xmlrpclib.ServerProxy(ip360_endpoint, context=ssl._create_unverified_context()) | |
session = server.login(2, 0, ip360_username, ip360_pw) | |
resp = server.call(session,'SESSION','getUserObject',{}) | |
logging.info("#######################\nip_360 UserObject: %s\n" % (resp)) | |
return server, session | |
def get_config(environments): | |
logging.info("#######################\nusing environments: %s" % (environments)) | |
device_profiler = environments[os.environ['ENVIRONMENT']]['device_profiler'] | |
scan_profile = environments[os.environ['ENVIRONMENT']]['scan_profile'] | |
network = environments[os.environ['ENVIRONMENT']]['network'] | |
logging.info("device_profiler for this scan: %s" % (device_profiler)) | |
logging.info("scan_profile for this scan: %s" % (scan_profile)) | |
logging.info("network for this scan: %s\n" % (network)) | |
return device_profiler, scan_profile, network | |
def get_device_profilers(server, session): | |
params={'query':'name LIKE \'%\''} | |
device_profilers = server.call(session,'class.DP','search',params) | |
logging.info("#######################\ndevice_profilers: %s\n" % (device_profilers)) | |
return device_profilers | |
def get_scan_profiles(server, session): | |
params={'query':'name LIKE \'%\''} | |
scan_profiles = server.call(session,'class.ScanProfile','search',params) | |
logging.info("#######################\nscan_profiles: %s\n" % (scan_profiles)) | |
return scan_profiles | |
def get_networks(server, session): | |
params={'query':'name LIKE \'%\''} | |
networks = server.call(session,'class.Network','search',params) | |
logging.info("#######################\nnetworks: %s\n" % (networks)) | |
return networks | |
def validate_config(value, lst): | |
if any(value in x for x in lst): | |
logging.info("%s validated" % (value)) | |
return | |
else: | |
logging.critical("%s is not a valid value for %s" % (value,lst)) | |
sys.exit(1) | |
def scan_network(server, session, device_profiler,scan_profile,network): | |
params = {'network':network,'scanProfile':scan_profile} | |
audit = server.call(session,device_profiler,'startScan',params) | |
audit_attribs= server.call(session,audit,'getAttributes',{}) | |
logging.info("#######################\naudit_attribs: %s\n" % (audit_attribs)) | |
while audit_attribs['status'] == 1: #1:InProgress 2:Failed 3:Cancelled 4:Finished 5:Paused 6:Auto-Paused 7:Suspended | |
logging.info("TripWire IP360 scan in progress... ") | |
time.sleep(15) | |
audit_attribs= server.call(session,audit,'getAttributes',{}) | |
return audit | |
def get_report(server, session, audit): | |
params = {'format':'CSV'} | |
report = server.call(session,audit,'getReport',params) | |
logging.info("#######################\nreport: %s\n" % (report)) | |
return report | |
# authenticate with ip360 API endpoint | |
server, session = get_conn(ip360_endpoint, ip360_username, ip360_pw) | |
# get config for this scan | |
device_profiler, scan_profile, network = get_config(environments) | |
# get device profilers, scan profiles,and networks to validate config | |
device_profilers = get_device_profilers(server, session) | |
scan_profiles = get_scan_profiles(server, session) | |
networks = get_networks(server, session) | |
# validate configs | |
logging.info("#######################") | |
validate_config(device_profiler, device_profilers) | |
validate_config(scan_profile, scan_profiles) | |
validate_config(network, networks) | |
# execute scan | |
audit = scan_network(server, session, device_profiler,scan_profile,network) | |
# generate report | |
report = get_report(server, session, audit) | |
# notify on report | |
# gracefully close connection | |
logging.info("#######################\nexecution complete,logging out of IP360 endpoint") | |
server.logout(session) | |
''' | |
#some example calls to TripWire IP360 API | |
#nonprod 172.16.4.0/22 | |
#prod 172.16.8.0/22 | |
DPgetAttribs = server.call(session,'class.DP','getAttributes',{}) | |
print(DPgetAttribs) | |
DPinstanceAttribs = server.call(session,'DP.2','getAttributes',{}) | |
print(DPinstanceAttribs) | |
DPinstanceAttribValue = server.call(session,'DP.2','getAttributes',{}) | |
print(DPinstanceAttribValue['softwareVersion']) | |
''' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment