Skip to content

Instantly share code, notes, and snippets.

@sgviking
Last active September 8, 2023 19:21
Show Gist options
  • Save sgviking/39efec498055a042681550ed02346185 to your computer and use it in GitHub Desktop.
Save sgviking/39efec498055a042681550ed02346185 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import configparser
import json
import subprocess
import sys
import os
import requests
VERSION = '1.0.4'
def get_lw_subaccounts(account, token):
"""
https://docs.lacework.net/api/v2/docs/#tag/UserProfile
/api/v2/UserProfile
"""
url = f'https://{account}.lacework.net/api/v2/UserProfile'
headers = {'Content-Type':'application/json', 'Authorization':f'Bearer {token}', 'Account-Name': account}
results = requests.get(url, headers=headers)
try:
subaccounts = results.json().get('data', [])
except:
return []
return subaccounts
def get_azure_subscriptions(account, token, subaccount, tenant_id):
"""
https://docs.lacework.net/api/v2/docs/#tag/Configs/paths/~1api~1v2~1Configs~1AzureSubscriptions/get
/api/v2/Configs/AzureSubscriptions?tenantId={tenantId}
"""
url = f'https://{account}.lacework.net/api/v2/Configs/AzureSubscriptions?tenantId={tenant_id}'
headers = {'Content-Type':'application/json', 'Authorization':f'Bearer {token}', 'Account-Name': subaccount}
results = requests.get(url, headers=headers)
accounts = []
try:
subscriptions = results.json().get('data', [])[0].get('subscriptions', [])
except:
return []
for subscription in subscriptions:
accounts.append({'tenant_id': tenant_id, 'subscription_id': subscription})
return accounts
def get_gcp_projects(account, token, subaccount, org_id):
"""
https://docs.lacework.net/api/v2/docs/#tag/Configs/paths/~1api~1v2~1Configs~1GcpProjects/get
/api/v2/Configs/GcpProjects?orgId={orgId}
"""
url = f'https://{account}.lacework.net/api/v2/Configs/GcpProjects?orgId={org_id}'
headers = {'Content-Type':'application/json', 'Authorization':f'Bearer {token}', 'Account-Name': subaccount}
results = requests.get(url, headers=headers)
accounts = []
try:
projects = results.json().get('data', [])[0].get('projects', [])
except:
return []
for project in projects:
accounts.append({'org_id': org_id, 'project_id': project})
return accounts
def get_csp_accounts(account, token, subaccount):
"""
https://docs.lacework.net/api/v2/docs/#tag/CloudAccounts/paths/~1api~1v2~1CloudAccounts/get
/api/v2/CloudAccounts
"""
url = f'https://{account}.lacework.net/api/v2/CloudAccounts'
headers = {'Content-Type':'application/json', 'Authorization':f'Bearer {token}', 'Account-Name': subaccount}
results = requests.get(url, headers=headers)
accounts = { 'AwsCfg': [], 'GcpCfg': [], 'AzureCfg': [] }
try:
csp_accounts = results.json().get('data', [])
except:
return accounts
for csp_account in csp_accounts:
if csp_account['type'] == 'AwsCfg':
accounts['AwsCfg'].append(csp_account['data']['awsAccountId'])
elif csp_account['type'] == 'AzureCfg':
subscriptions = get_azure_subscriptions(account, token, subaccount, csp_account['data']['tenantId'])
accounts['AzureCfg'] = accounts['AzureCfg'] + subscriptions
elif csp_account['type'] == 'GcpCfg':
projects = get_gcp_projects(account, token, subaccount, csp_account['data']['id'])
accounts['GcpCfg'] = accounts['GcpCfg'] + projects
return accounts
def get_access_token(profile):
# Read in lacework cli config file and pull details for specified profile
home = os.path.expanduser('~')
config = configparser.ConfigParser()
config.read(home + "/.lacework.toml")
if not config.has_section(profile):
return None, None
# Use API key and secret to get access token / bearer token
api_key = config[profile]['api_key'].strip('"')
api_secret = config[profile]['api_secret'].strip('"')
account = config[profile]['account'].strip('"')
# api_key, api_secret, account
url = f'https://{account}.lacework.net/api/v2/access/tokens'
headers = {'Content-Type': 'application/json', 'X-LW-UAKS': api_secret}
data = {'keyId': api_key, 'expiryTime': 36000}
results = requests.post(url, headers=headers, data=json.dumps(data))
token = results.json()['token']
return account, token
def get_reports(account, subaccount, token, report, primary_id, secondary_id=None):
print(f"Getting {report} report for IDs {primary_id}, {secondary_id} in sub-account {subaccount}")
headers = {'Content-Type':'application/json', 'Authorization':f'Bearer {token}', 'Account-Name': subaccount}
url = f'https://{account}.lacework.net/api/v2/Reports?primaryQueryId={primary_id.split()[0]}&format=json&reportType={report}'
if secondary_id:
url = f'{url}&secondaryQueryId={secondary_id.split()[0]}'
results = requests.get(url, headers=headers)
try:
return results.json()['data'][0]['summary'][0]
except:
return {}
def save_reports(output, reports):
os.makedirs(output, mode = 0o755, exist_ok = True)
with open(f'{output}/reports.json', 'w') as file:
file.write(json.dumps(reports))
def summary(reports, details=False):
for subaccount in reports:
csps = {'aws': 'Accounts', 'gcp': 'Projects', 'azure': 'Subscriptions'}
print(f'{subaccount} -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-')
for csp in csps:
rcount = 0
rfailed = 0
rexceptions = 0
not_compliant = 0
compliant = 0
highsev = 0
count = 0
accounts = reports[subaccount].get(csp, None)
if not accounts:
continue
for account_id, report in reports[subaccount][csp].items():
total_policies = report.get('NUM_RECOMMENDATIONS', None)
if total_policies is None:
continue
count = count + 1
# not_compliant = not_compliant + report['NUM_NOT_COMPLIANT']
compliant = compliant + report['NUM_COMPLIANT']
highsev = highsev + report['NUM_SEVERITY_1_NON_COMPLIANCE'] + report['NUM_SEVERITY_2_NON_COMPLIANCE']
rcount = rcount + report['ASSESSED_RESOURCE_COUNT']
rfailed = rfailed + report['VIOLATED_RESOURCE_COUNT']
rexceptions = rexceptions + report['SUPPRESSED_RESOURCE_COUNT']
account_highsev = round((float(report['NUM_SEVERITY_1_NON_COMPLIANCE'] + report['NUM_SEVERITY_2_NON_COMPLIANCE']) / float(total_policies)) * 100, 1)
account_compliant = round((float(report['NUM_COMPLIANT']) / total_policies) * 100, 1)
if details:
# print(f' Compliant: {report["NUM_COMPLIANT"]} High sev non-compliant: {report["NUM_SEVERITY_1_NON_COMPLIANCE"] + report["NUM_SEVERITY_2_NON_COMPLIANCE"]} Total: {total_policies}')
print(f' {account_id}\tCompliant: {account_compliant}%\tHigh severity: {account_highsev}%')
if count < 1:
continue
total = total_policies * count
percent_compliant = round((float(compliant) / float(total)) * 100, 1)
percent_highsev = round((float(highsev) / float(total)) * 100, 1)
# print(f' Compliant: {compliant} High sev non-compliant: {highsev} Total: {total}')
print(f' {csp.upper()} {csps[csp]}: {count}\tRecommendations: {total_policies}\tResources: {rcount}\tFailed: {rfailed}\tCompliant: {percent_compliant}%\tHigh severity: {percent_highsev}%')
def parse_args():
parser = argparse.ArgumentParser(description=f'Pull Lacework compliance reports across multiple sub-accounts.\nVersion: {VERSION}')
parser.add_argument('-c', '--cache', action='store_true', help='Use the previous API pull in output/')
parser.add_argument('-d', '--details', action='store_true', help='Show detailed breakdown of each account/project/subscription.')
parser.add_argument('-p', '--profile', default='default', help='Specify profile to use from lacework CLI configuration. Defaults to \'default\'.')
parser.add_argument('-o', '--output', default='output', help='Output directory for storing API calls. Defaults to \'output\'')
return parser.parse_args()
def main():
args = parse_args()
if args.cache:
try:
with open(f'{args.output}/reports.json', 'r') as file:
reports = json.loads(file.read())
except:
print('Error opening {args.output} directory. Re-run without --cache option')
sys.exit(1)
else:
account, token = get_access_token(args.profile)
subaccounts = get_lw_subaccounts(account, token)
reports = {}
for subaccount in subaccounts[0]['accounts']:
reports[subaccount['accountName']] = {'aws': {}, 'gcp': {}, 'azure': {}}
accounts = get_csp_accounts(account, token, subaccount['accountName'])
for aws_account in accounts['AwsCfg']:
reports[subaccount['accountName']]['aws'][aws_account] = get_reports(account, subaccount['accountName'], token, 'AWS_CIS_14', aws_account)
for gcp_account in accounts['GcpCfg']:
reports[subaccount['accountName']]['gcp'][gcp_account['org_id'] + '/' + gcp_account['project_id']] = get_reports(account, subaccount['accountName'], token, 'GCP_CIS13', gcp_account['org_id'], gcp_account['project_id'])
for azure_account in accounts['AzureCfg']:
reports[subaccount['accountName']]['azure'][azure_account['tenant_id'] + '/' + azure_account['subscription_id']] = get_reports(account, subaccount['accountName'], token, 'AZURE_CIS_1_5', azure_account['tenant_id'], azure_account['subscription_id'])
save_reports(args.output, reports)
summary(reports, args.details)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment