Created
September 23, 2020 04:46
-
-
Save illuzian/8f67e9b1204a1cf1393be90930d8667f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Anti-malware resolver. | |
import re | |
from azure.common.credentials import ServicePrincipalCredentials | |
from azure.mgmt.compute import ComputeManagementClient | |
from azure.mgmt.subscription import SubscriptionClient | |
from azure.mgmt.advisor import AdvisorManagementClient | |
from azure.mgmt.security import SecurityCenter | |
import pprint | |
pp = pprint.PrettyPrinter(indent=4, depth=40) | |
locations = ['australiaeast', 'australiasoutheast', 'australiacentral', 'australiacentral2'] | |
demo = ComputeManagementClient() | |
tenant = '' | |
client_id = '' | |
secret = '' | |
credentials = ServicePrincipalCredentials( | |
client_id=client_id, | |
secret=secret, | |
tenant=tenant | |
) | |
atp_config = { | |
"type": "extensions", | |
"name": "OMSExtension", | |
"apiVersion": "[variables('apiVersion')]", | |
"location": "[resourceGroup().location]", | |
"dependsOn": [ | |
"[concat('Microsoft.Compute/virtualMachines/', variables('vmName'))]" | |
], | |
"properties": { | |
"publisher": "Microsoft.EnterpriseCloud.Monitoring", | |
"type": "MicrosoftMonitoringAgent", | |
"typeHandlerVersion": "1.0", | |
"autoUpgradeMinorVersion": "true", | |
"settings": { | |
"workspaceId": "" | |
}, | |
"protectedSettings": { | |
"" | |
} | |
}} | |
# Problem to search for using "def yield_impacted_machines" | |
problem = 'Install endpoint protection solution on virtual machines' | |
# Generator for all subscriptions in a tenant. | |
def yield_subs(credentials=credentials): | |
subscription_client = SubscriptionClient(credentials) | |
for s in subscription_client.subscriptions.list(): | |
yield s | |
# Generator for Security Center alerts for specific locations (geographic) in a sub. | |
def yield_alerts(subscription, credentials=credentials, locations=locations): | |
for location in locations: | |
security_center = SecurityCenter(credentials, subscription.subscription_id, location) | |
for alert in security_center.alerts.list(): | |
yield alert | |
# Generator for Azure Advisor recommendations for given subscription. | |
def yield_recommendations(subscription, credentials=credentials): | |
advisor_management_client = AdvisorManagementClient(credentials, subscription.subscription_id) | |
for recommendation in advisor_management_client.recommendations.list(): | |
if recommendation.impacted_field == 'Microsoft.Compute/virtualMachines': | |
yield recommendation | |
# Generator for machines impacted by text description "problem". | |
def yield_impacted_machines(problem=problem): | |
# TODO: add impact=None, category=None, name=None | |
for sub in yield_subs(): | |
for recommendation in yield_recommendations(sub): | |
if recommendation.short_description.problem == problem: | |
yield {"entity": recommendation.impacted_value, "full_id": recommendation.id, | |
"sub_name": sub.display_name, "sub_id": sub.subscription_id} | |
extension_parameters = {'location': 'CHANGEME', 'publisher': 'Microsoft.Azure.Security', | |
'virtual_machine_extension_type': 'IaaSAntimalware', | |
'type_handler_version': '1.1', 'auto_upgrade_minor_version': True, | |
'settings': | |
{"AntimalwareEnabled": "true", "RealtimeProtectionEnabled": "true", }} | |
entity_by_sub = {} | |
for impacted in yield_impacted_machines(): | |
sub_id = impacted['sub_id'] | |
resource_group_regex = re.search('\/resourceGroups\/(.*?)\/', impacted['full_id'], re.IGNORECASE) | |
if resource_group_regex: | |
rg = resource_group_regex.group(1) | |
entity = impacted["entity"] | |
if sub_id not in entity_by_sub: | |
entity_by_sub[sub_id] = {} | |
if rg not in entity_by_sub[sub_id]: | |
entity_by_sub[impacted['sub_id']][rg] = [] | |
entity_by_sub[sub_id][rg].append(impacted['entity']) | |
# Print CSV header | |
print("vm_name, vm_state, os_name, os_version, vm_agent") | |
for sub, rgs in entity_by_sub.items(): | |
compute_client = ComputeManagementClient(credentials, sub) | |
for rg, vm_list in rgs.items(): | |
for vm_name in vm_list: | |
vm = compute_client.virtual_machines.get(rg.upper(), vm_name) | |
vm_instance = compute_client.virtual_machines.instance_view(rg.upper(), vm_name) | |
# instance.os_version and instance.os_name | instance.vm_agent | |
vm_state = vm_instance.statuses[1].display_status | |
vm_name = vm.name | |
os_name = vm_instance.os_name | |
os_version = vm_instance.os_version | |
vm_agent = "Installed" if vm_instance.vm_agent is not None else "Not Installed" | |
output = "{vm_name}, {vm_state}, {os_name}, {os_version}, {vm_agent}".format( | |
vm_name=vm_name, vm_state=vm_state, os_name=os_name, os_version=os_version, vm_agent=vm_agent) | |
print(output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment