Created
May 6, 2021 10:10
-
-
Save akhil-reni/a6ba2ad88ffe0c0384b245bf5eed6e97 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mainAPI.models import Organization, OrganizationMember, User, Asset, Bug, CWE, CVE, Package, Mobile, Web, Code, Port, Cloud, Endpoint, Network, AssetHealth, Team, Permissions | |
import numpy as np | |
import random | |
import pandas as pd | |
import os | |
class Dummy: | |
def __init__(self): | |
self.create_organization() | |
self.add_member_organization() | |
self.create_bulk_assets() | |
self.create_bulk_bugs() | |
self.get_asset_health() | |
def create_organization(self): | |
# Avoid creating duplicate organization | |
if not Organization.objects.filter(name="Acme Org").exists(): | |
organization = Organization.objects.create( | |
name="Cipla Private Limited", industry="Education") | |
print("created new organization") | |
else: | |
organization = Organization.objects.filter( | |
name="Acme pentest company").last() | |
return organization | |
def add_member_organization(self): | |
# create user and assign role and Security team | |
organization = self.create_organization() | |
owner = User.objects.create_superuser(email="[email protected]", password='a', first_name="Kalpesh", | |
last_name="shah", is_active=True) | |
manager = User.objects.create( | |
email="[email protected]", password='a', first_name="Ravi ", last_name="Kiran") | |
dev1 = User.objects.create( | |
email="[email protected]", password='a', first_name="Akash", last_name="dev") | |
dev2 = User.objects.create( | |
email="[email protected]", password='a', first_name="Manoj", last_name="prada") | |
owner = OrganizationMember.objects.create( | |
user=owner, organization=organization, role=1) | |
manger = OrganizationMember.objects.create( | |
user=manager, organization=organization, role=2) | |
dev_1 = OrganizationMember.objects.create( | |
user=dev1, organization=organization, role=2) | |
dev_2 = OrganizationMember.objects.create( | |
user=dev2, organization=organization, role=2) | |
# add security team | |
team = Team.objects.create(name="Acme Pentest Company") | |
team.organization = organization | |
team.save() | |
user_ = User.objects.filter(email="[email protected]") | |
if user_: | |
perm = Permissions.objects.create(team=team, role=2) | |
org_members = OrganizationMember.objects.create( | |
user=user_[0], organization=organization, role=3) | |
org_members.permissions.add(perm) | |
return True | |
def create_bulk_assets(self): | |
# create bulk asset | |
count = 100 | |
organization = self.create_organization() | |
asset1 = Asset(name="Acme Website", type=1, | |
exposed=1, organization=organization) | |
asset2 = Asset(name="Acme Mobile", type=2, organization=organization) | |
asset3 = Asset(name="Acme internal 200.24.114.11", type=3, | |
target="200.24.114.11", organization=organization) | |
asset4 = Asset(name="Acme Portal", type=1, organization=organization) | |
asset5 = Asset(name="Acme IOS", type=2, organization=organization) | |
asset6 = Asset(name="Acme US Internal 220.112.14.16", type=3, | |
target="220.112.14.16", organization=organization) | |
asset7 = Asset(name="Acme Blog", type=1, exposed=1, | |
organization=organization) | |
asset8 = Asset(name="Acme Hybrid", type=2, | |
exposed=2, organization=organization) | |
asset9 = Asset(name="246.118.11.17", type=3, | |
target="246.118.11.17", organization=organization) | |
asset10 = Asset(name="Acme IOT Dark", type=1, | |
organization=organization) | |
asset11 = Asset(name="Acme cloud", type=4, | |
exposed=1, organization=organization) | |
asset12 = Asset(name="Acme Website", type=5, organization=organization) | |
asset13 = Asset(name="272.20.22.21", type=3, | |
target="272.20.22.21", organization=organization) | |
asset14 = Asset(name="Acme external", type=5, | |
organization=organization) | |
asset15 = Asset(name="Acme cloudbox", type=4, | |
organization=organization) | |
asset_list = [asset1, asset2, asset3, asset4, asset5, asset6, asset7, asset8, asset9, | |
asset10, asset11, asset12, asset13, asset14, asset14, asset15] | |
asset_random_obj_list = [] | |
for i in range(0, count): | |
random_asset = random.choice(asset_list) | |
asset_random_obj_list.append(random_asset) | |
a = Asset.objects.bulk_create(asset_random_obj_list) | |
print("Bulk asset created succesfully!!") | |
return True | |
def create_bulk_bugs(self): | |
count = 100 | |
bug_list = [] | |
organization = self.create_organization() | |
bug_title_list, desc_list = self._bug_static_data() | |
user_ids = OrganizationMember.objects.filter( | |
organization=organization, role=2).values_list('user__id', flat=True) | |
asset_ids = Asset.objects.filter(organization=organization).values_list( | |
'id', flat=True) # Filter and get organization level assets | |
affected_endpoints = ["https://example.com", "https://localhost.com"] | |
bug_level = None | |
content_type_obj = None | |
for asset in Asset.objects.filter(organization=organization): | |
if asset.type == 1: | |
web = Web.objects.create( | |
request="web bug", response="response generated") | |
for url in affected_endpoints: | |
endpoint = Endpoint.objects.filter(url=url) | |
if endpoint.exists(): | |
web.endpoints.add(endpoint[0]) | |
else: | |
web.endpoints.add(Endpoint.objects.create(url=url)) | |
content_type_obj = web | |
bug_level = 2 | |
if asset.type == 2: | |
mobile = Mobile() | |
mobile.save() | |
content_type_obj = mobile | |
bug_level = 3 | |
if asset.type == 3: | |
port_num = random.randrange(1000, 8080) | |
port_obj = Port.objects.filter( | |
port=port_num, organization=asset.organization, asset=asset) | |
port = port_obj.first() | |
if not port_obj.exists(): | |
port = Port.objects.create(organization=asset.organization, asset=asset, | |
port=port_num) | |
network, _ = Network.objects.get_or_create(port=port, cpe=[]) | |
content_type_obj = network | |
bug_level = 4 | |
if asset.type == 4: | |
cloud = Cloud.objects.create(region="us-east-2", aws_type=0, vulnerable_id="SS1", | |
aws_category="dynamo") | |
content_type_obj = cloud | |
bug_level = 5 | |
if asset.type == 5: | |
code = Code.objects.create(vulnerable_code="23627-12123", start_line_number=23, | |
end_line_number=23, file_name="test.docx") | |
package = Package.objects.create(fixed_version="v2.0.1", | |
installed_version="v2.0.2", package_name="beta v2", affected_versions="v2.0.1") | |
if code: | |
content_type_obj = code | |
bug_level = 1 | |
if package: | |
content_type_obj = package | |
bug_level = 6 | |
# create count number of bugs | |
for i in range(0, count): | |
bug = Bug(title=random.choice(bug_title_list), | |
bug_level=bug_level if bug_level else random.choice( | |
[1, 2, 3, 4, 5, 6]), | |
content_object=content_type_obj, | |
description=random.choice(desc_list), | |
cvss=random.choice( | |
[0.1, 0.5, 0.3, 5, 6, 4, 7, 8, 9, 9.5]), | |
prioritization_score=random.choice( | |
[i for i in range(3, 100)]), | |
severity=random.choice([1, 2, 5]), | |
state=random.choice([0, 1]), | |
organization=organization, | |
asset=Asset.objects.get(id=random.choice(asset_ids)), | |
exploit_available=False, | |
reported_by=User.objects.get(id=random.choice(user_ids)) | |
) | |
bug_list.append(bug) | |
if len(bug_list) > 0: | |
Bug.objects.bulk_create(bug_list) | |
print("Bugs created succesfully!!") | |
return True | |
def _web_level_dummy_data(self): | |
web_title_1 = "subprocess_without_shell_equals_true" | |
web_title_2 = "try_except_pass" | |
web_title_3 = "flask_debug_true" | |
web_title_4 = "blacklist" | |
web_title_5 = "Web Entry description" | |
desc_1 = "subprocess_without_shell_equals_true" | |
desc_2 = "Try, Except, Pass detected." | |
desc_3 = "The input method in Python 2 will read from standard input, evaluate and run the resulting string as python source code. This is similar, though in many ways worse, then using eval. On Python 2, use raw_input instead, input is safe in Python 3." | |
desc_4 = "Consider possible security implications associated with subprocess module" | |
desc_5 = "A Flask app appears to be run with debug=True, which exposes the Werkzeug debugger and allows the execution of arbitrary code." | |
w = [web_title_1, web_title_2, web_title_3, web_title_4, web_title_5] | |
d = [desc_1, desc_2, desc_3, desc_4, desc_5] | |
return w, d | |
def _network_level_dummy_data(self): | |
net_title_1 = "SSL Certificate Cannot Be Trusted" | |
net_title_2 = "Web Application Potentially Vulnerable to Clickjacking" | |
net_title_3 = "PHP expose_php Information Disclosure" | |
net_title_4 = "Microsoft Windows Remote Desktop Protocol Server Man-in-the-Middle Weakness" | |
net_title_5 = "Network Time Protocol (NTP) Mode 6 Scanner" | |
desc_1 = "The SSL certificate for this service is for a different host.```"\ | |
"The identities known by Nessus are :"\ | |
"10.10.100.80"\ | |
"10.103.0.10"\ | |
"rsc-srv-vmd-app"\ | |
"rsc-srv-vmd-app.smartraipur.com"\ | |
"The Common Name in the certificate is :"\ | |
"smartraipur.com```" | |
desc_2 = "The remote web server may fail to mitigate a class of web application vulnerabilities.```"\ | |
"The following pages do not use a clickjacking mitigation response header and contain a clickable event :"\ | |
"- http://cipla.in/domcfg.nsf/2a4f2da2f46ea33385256aab00725595"\ | |
"- http://cipla.in/domcfg.nsf/Bottom?OpenPage"\ | |
"- http://cipla.in/domcfg.nsf/LoginMappings?OpenView"\ | |
"- http://cipla.in/domcfg.nsf/Outline?OpenPage&BaseTarget=NotesView"\ | |
"- http://cipla.in/domcfg.nsf/Top?OpenPage"\ | |
"- http://cipla.in/names.nsf"\ | |
"- http://cipla.in/stconfig.nsf?Login```" | |
desc_3 = "The configuration of PHP on the remote host allows disclosure of sensitive information.```"\ | |
"Nessus was able to verify the issue using the following URL :"\ | |
"http://cipla.in:502/index.php/?=PHPB8B5F2A0-3C92-11d3-A3A9-4C7B08C10000```" | |
desc_4 = "It may be possible to get access to the remote host" | |
desc_5 = "The remote NTP server responds to mode 6 queries.```"\ | |
"Nessus elicited the following response from the remote"\ | |
"host by sending an NTP mode 6 query :"\ | |
"'version=""ntpd [email protected] Fri Jul 6 20:10:51 UTC 2018 (1),"\ | |
"processor=""x86_64"", system=""Linux/4.4.0-131-generic"", leap=0, stratum=4,"\ | |
"precision=-23, rootdelay=167.351, rootdisp=67.037, refid=10.10.100.58,"\ | |
"reftime=0xe1542dc4.0d8c77de, clock=0xe1543210.451ea07f, peer=35085,"\ | |
"tc=10, mintc=3, offset=0.749459, frequency=5.208, sys_jitter=0.000000,"\ | |
"clk_jitter=0.198, clk_wander=0.037'```" | |
w = [net_title_1, net_title_2, net_title_3, net_title_4, net_title_5] | |
d = [desc_1, desc_2, desc_3, desc_4, desc_5] | |
return w, d | |
def _bug_static_data(self): | |
title_1 = "Using cElementTree to parse untrusted XML data is known to be vulnerable to XML attacks. Replace " \ | |
"cElementTree with the equivalent defusedxml packag " | |
title_2 = "Probable insecure usage of temp file/directory." | |
title_3 = "subprocess call - check for execution of untrusted input." | |
title_4 = "XSS defender" | |
title_5 = "Relative Path Traversal" | |
title_6 = "Path Traversal" | |
title_7 = "Doubled Character XSS Manipulations" | |
title_8 = "Compiler Removal of Code to Clear Buffers" | |
title_9 = "Xender" | |
title_10 = "XSS Manipulations" | |
desc_3 = "Bug path traversal import Xtree" | |
w_title, w_desc = self._web_level_dummy_data() | |
n_title, n_desc = self._network_level_dummy_data() | |
bug_title_list = [title_1, title_2, title_3, title_4, title_5, | |
title_6, title_7, title_8, title_9, title_10, w_title, n_title] | |
desc_list = [desc_3, w_desc, n_desc] | |
return bug_title_list, desc_list | |
def get_asset_health(self): | |
# Get date range of 3 month w.r.t current month | |
import datetime | |
from datetime import timedelta | |
from calendar import monthrange | |
total_days = 0 | |
# calculate last date, start date of last 2 month | |
end = datetime.datetime.today().date() | |
start_day_of_current_month = end - \ | |
timedelta(days=int(end.strftime("%d"))-1) | |
previous_month_last_date = start_day_of_current_month - \ | |
timedelta(days=1) | |
days = monthrange(int(previous_month_last_date.strftime("%y")), int( | |
previous_month_last_date.strftime("%m")))[1] | |
last_previous_month_last_date = previous_month_last_date - \ | |
timedelta(days=days) | |
previous_last_month_days = monthrange(int(last_previous_month_last_date.strftime( | |
"%y")), int(last_previous_month_last_date.strftime("%m")))[1] | |
total_days = days + previous_last_month_days | |
start = start_day_of_current_month - timedelta(days=total_days) | |
organization = self.create_organization() | |
times = np.array(pd.date_range(start=start, end=end)) | |
assets_list = Asset.objects.filter( | |
organization=organization).values_list('id', flat=True) | |
for date in times: | |
health = AssetHealth.objects.create(organization=organization, | |
asset=Asset.objects.get( | |
id=random.choice(assets_list)), | |
risk_score=random.choice([i for i in range(1, 100)])) | |
health.created = datetime.datetime.utcfromtimestamp( | |
date.tolist() / 1e9) | |
health.save() | |
return True | |
print("done it!!") | |
if os.getenv('DEPLOYMENT_MODE', 'enterprise') == 'SAAS': | |
pass | |
else: | |
Dummy() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment