Skip to content

Instantly share code, notes, and snippets.

@illuzian
Created September 23, 2020 04:46
Show Gist options
  • Save illuzian/8f67e9b1204a1cf1393be90930d8667f to your computer and use it in GitHub Desktop.
Save illuzian/8f67e9b1204a1cf1393be90930d8667f to your computer and use it in GitHub Desktop.
# Anti-malware resolver.
import re
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.subscription import SubscriptionClient
from azure.mgmt.advisor import AdvisorManagementClient
from azure.mgmt.security import SecurityCenter
import pprint
pp = pprint.PrettyPrinter(indent=4, depth=40)
locations = ['australiaeast', 'australiasoutheast', 'australiacentral', 'australiacentral2']
demo = ComputeManagementClient()
tenant = ''
client_id = ''
secret = ''
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=secret,
tenant=tenant
)
atp_config = {
"type": "extensions",
"name": "OMSExtension",
"apiVersion": "[variables('apiVersion')]",
"location": "[resourceGroup().location]",
"dependsOn": [
"[concat('Microsoft.Compute/virtualMachines/', variables('vmName'))]"
],
"properties": {
"publisher": "Microsoft.EnterpriseCloud.Monitoring",
"type": "MicrosoftMonitoringAgent",
"typeHandlerVersion": "1.0",
"autoUpgradeMinorVersion": "true",
"settings": {
"workspaceId": ""
},
"protectedSettings": {
""
}
}}
# Problem to search for using "def yield_impacted_machines"
problem = 'Install endpoint protection solution on virtual machines'
# Generator for all subscriptions in a tenant.
def yield_subs(credentials=credentials):
subscription_client = SubscriptionClient(credentials)
for s in subscription_client.subscriptions.list():
yield s
# Generator for Security Center alerts for specific locations (geographic) in a sub.
def yield_alerts(subscription, credentials=credentials, locations=locations):
for location in locations:
security_center = SecurityCenter(credentials, subscription.subscription_id, location)
for alert in security_center.alerts.list():
yield alert
# Generator for Azure Advisor recommendations for given subscription.
def yield_recommendations(subscription, credentials=credentials):
advisor_management_client = AdvisorManagementClient(credentials, subscription.subscription_id)
for recommendation in advisor_management_client.recommendations.list():
if recommendation.impacted_field == 'Microsoft.Compute/virtualMachines':
yield recommendation
# Generator for machines impacted by text description "problem".
def yield_impacted_machines(problem=problem):
# TODO: add impact=None, category=None, name=None
for sub in yield_subs():
for recommendation in yield_recommendations(sub):
if recommendation.short_description.problem == problem:
yield {"entity": recommendation.impacted_value, "full_id": recommendation.id,
"sub_name": sub.display_name, "sub_id": sub.subscription_id}
extension_parameters = {'location': 'CHANGEME', 'publisher': 'Microsoft.Azure.Security',
'virtual_machine_extension_type': 'IaaSAntimalware',
'type_handler_version': '1.1', 'auto_upgrade_minor_version': True,
'settings':
{"AntimalwareEnabled": "true", "RealtimeProtectionEnabled": "true", }}
entity_by_sub = {}
for impacted in yield_impacted_machines():
sub_id = impacted['sub_id']
resource_group_regex = re.search('\/resourceGroups\/(.*?)\/', impacted['full_id'], re.IGNORECASE)
if resource_group_regex:
rg = resource_group_regex.group(1)
entity = impacted["entity"]
if sub_id not in entity_by_sub:
entity_by_sub[sub_id] = {}
if rg not in entity_by_sub[sub_id]:
entity_by_sub[impacted['sub_id']][rg] = []
entity_by_sub[sub_id][rg].append(impacted['entity'])
# Print CSV header
print("vm_name, vm_state, os_name, os_version, vm_agent")
for sub, rgs in entity_by_sub.items():
compute_client = ComputeManagementClient(credentials, sub)
for rg, vm_list in rgs.items():
for vm_name in vm_list:
vm = compute_client.virtual_machines.get(rg.upper(), vm_name)
vm_instance = compute_client.virtual_machines.instance_view(rg.upper(), vm_name)
# instance.os_version and instance.os_name | instance.vm_agent
vm_state = vm_instance.statuses[1].display_status
vm_name = vm.name
os_name = vm_instance.os_name
os_version = vm_instance.os_version
vm_agent = "Installed" if vm_instance.vm_agent is not None else "Not Installed"
output = "{vm_name}, {vm_state}, {os_name}, {os_version}, {vm_agent}".format(
vm_name=vm_name, vm_state=vm_state, os_name=os_name, os_version=os_version, vm_agent=vm_agent)
print(output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment